use super::{
planner::{ExecutionStep, IndexSelection, ParallelizationInfo, QueryPlan, StepType},
ComparisonOperator, Condition,
};
use crate::{
schema::SchemaManager, storage::StorageEngine, Config, Error, Result, RowKey, TableId, Value,
};
use crossbeam::channel;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
pub use super::result::{QueryResult, QueryRow};
const DEFAULT_PARALLEL_WORKERS: usize = 4;
static DEFAULT_PARALLELIZATION: ParallelizationInfo = ParallelizationInfo {
can_parallelize: true,
suggested_threads: DEFAULT_PARALLEL_WORKERS,
partition_key: None,
};
#[derive(Debug, Clone)]
pub struct QueryExecutor {
storage: Arc<StorageEngine>,
_schema: Arc<SchemaManager>,
_config: Config,
}
impl QueryExecutor {
pub fn new(storage: Arc<StorageEngine>, schema: Arc<SchemaManager>, config: &Config) -> Self {
Self {
storage,
_schema: schema,
_config: config.clone(),
}
}
pub async fn execute(&self, plan: &QueryPlan) -> Result<QueryResult> {
let start_time = Instant::now();
let has_insert_step = plan
.steps
.iter()
.any(|step| matches!(step.step_type, StepType::Insert));
let is_create_table =
plan.steps.is_empty() && plan.table.is_some() && plan.estimated_rows == 0;
#[cfg(debug_assertions)]
eprintln!(
"DEBUG: Plan steps: {:?}, has_insert_step: {}, is_create_table: {}",
plan.steps.iter().map(|s| &s.step_type).collect::<Vec<_>>(),
has_insert_step,
is_create_table
);
let result = match plan.plan_type {
super::planner::PlanType::PointLookup => self.execute_point_lookup(plan).await,
super::planner::PlanType::IndexScan => self.execute_index_scan(plan).await,
super::planner::PlanType::RangeScan => self.execute_range_scan(plan).await,
super::planner::PlanType::TableScan if has_insert_step => {
#[cfg(feature = "experimental")]
{
self.execute_insert_operation(plan).await
}
#[cfg(not(feature = "experimental"))]
{
Err(Error::UnsupportedFormat(
"INSERT operations require the 'experimental' feature. \
Add 'experimental' to your Cargo.toml features."
.to_string(),
))
}
}
super::planner::PlanType::TableScan if is_create_table => {
self.execute_create_table_operation(plan).await
}
super::planner::PlanType::TableScan => self.execute_table_scan(plan).await,
super::planner::PlanType::Join => self.execute_join(plan).await,
super::planner::PlanType::Aggregation => self.execute_aggregation(plan).await,
super::planner::PlanType::Subquery => self.execute_subquery(plan).await,
};
let mut query_result = result?;
let elapsed_ms = start_time.elapsed().as_millis() as u64;
#[cfg(debug_assertions)]
eprintln!(
"DEBUG: Final result before metadata update - rows_affected: {}",
query_result.rows_affected
);
query_result.execution_time_ms = elapsed_ms;
query_result.metadata.plan_info = Some(super::result::PlanInfo {
plan_type: format!("{:?}", plan.plan_type),
estimated_cost: plan.estimated_cost,
actual_cost: elapsed_ms as f64,
indexes_used: Vec::new(), steps: plan
.steps
.iter()
.map(|s| format!("{:?}", s.step_type))
.collect(),
parallelization: plan
.steps
.iter()
.find(|s| s.parallelization.can_parallelize)
.map(|s| super::result::ParallelizationInfo {
threads_used: s.parallelization.suggested_threads,
effective: true,
partitions: Vec::new(),
}),
});
Ok(query_result)
}
fn require_table<'a>(&self, plan: &'a QueryPlan) -> Result<&'a TableId> {
plan.table
.as_ref()
.ok_or_else(|| Error::query_execution("Missing table in plan"))
}
fn find_condition<'a>(steps: &'a [ExecutionStep], column: &str) -> Option<&'a Condition> {
steps
.iter()
.flat_map(|s| s.conditions.iter())
.find(|c| c.column == column)
}
fn scan_pairs_to_rows(&self, pairs: Vec<(RowKey, Value)>) -> Result<Vec<QueryRow>> {
let mut rows = Vec::with_capacity(pairs.len());
for (row_key, row_data) in pairs {
rows.push(self.storage_data_to_query_row(row_data, &row_key)?);
}
Ok(rows)
}
async fn full_scan_rows(&self, table: &TableId) -> Result<Vec<QueryRow>> {
let scan_results = self.storage.scan(table, None, None, None, None).await?;
self.scan_pairs_to_rows(scan_results)
}
async fn point_lookup_rows(
&self,
table: &TableId,
condition: &Condition,
) -> Result<Vec<QueryRow>> {
let row_key = self.condition_to_row_key(condition)?;
match self.storage.get(table, &row_key).await? {
Some(row_data) => Ok(vec![self.storage_data_to_query_row(row_data, &row_key)?]),
None => Ok(Vec::new()),
}
}
fn make_result(rows: Vec<QueryRow>) -> QueryResult {
QueryResult::with_rows(rows)
}
async fn execute_point_lookup(&self, plan: &QueryPlan) -> Result<QueryResult> {
let table = self.require_table(plan)?;
let lookup_condition = plan
.steps
.iter()
.find_map(|step| step.conditions.first())
.ok_or_else(|| Error::query_execution("No lookup condition found"))?;
let row_key = self.condition_to_row_key(lookup_condition)?;
#[cfg(debug_assertions)]
eprintln!(
"DEBUG: SELECT point lookup using row key: {:?}",
std::str::from_utf8(row_key.as_bytes()).unwrap_or("<invalid-utf8>")
);
let mut rows = Vec::new();
if let Some(row_data) = self.storage.get(table, &row_key).await? {
rows.push(self.storage_data_to_query_row(row_data, &row_key)?);
}
Ok(Self::make_result(rows))
}
async fn execute_index_scan(&self, plan: &QueryPlan) -> Result<QueryResult> {
let table = self.require_table(plan)?;
let index_selection = plan
.selected_indexes
.first()
.ok_or_else(|| Error::query_execution("No index selected"))?;
let mut rows = match index_selection.index_type {
super::planner::IndexType::Secondary => {
self.execute_secondary_index_scan(table, index_selection, &plan.steps)
.await?
}
super::planner::IndexType::BloomFilter => {
self.execute_bloom_filter_scan(table, index_selection, &plan.steps)
.await?
}
super::planner::IndexType::Primary => {
self.execute_primary_index_scan(table, index_selection, &plan.steps)
.await?
}
super::planner::IndexType::Composite => {
self.execute_composite_index_scan(table, index_selection, &plan.steps)
.await?
}
};
rows = self.apply_execution_steps(rows, &plan.steps).await?;
Ok(Self::make_result(rows))
}
async fn execute_range_scan(&self, plan: &QueryPlan) -> Result<QueryResult> {
let table = self.require_table(plan)?;
let mut rows = self.full_scan_rows(table).await?;
rows = self.apply_execution_steps(rows, &plan.steps).await?;
Ok(Self::make_result(rows))
}
async fn execute_table_scan(&self, plan: &QueryPlan) -> Result<QueryResult> {
let table = self.require_table(plan)?;
#[cfg(debug_assertions)]
log::debug!("executor: Scanning for table: {:?}", table.name());
let can_parallelize = plan
.steps
.iter()
.any(|step| step.parallelization.can_parallelize);
let mut rows = if can_parallelize {
self.execute_parallel_table_scan(table, &plan.steps).await?
} else {
self.full_scan_rows(table).await?
};
rows = self.apply_execution_steps(rows, &plan.steps).await?;
Ok(Self::make_result(rows))
}
async fn execute_join(&self, _plan: &QueryPlan) -> Result<QueryResult> {
Ok(QueryResult::new())
}
async fn execute_aggregation(&self, _plan: &QueryPlan) -> Result<QueryResult> {
Ok(QueryResult::new())
}
async fn execute_subquery(&self, _plan: &QueryPlan) -> Result<QueryResult> {
Ok(QueryResult::new())
}
async fn execute_secondary_index_scan(
&self,
table: &TableId,
index_selection: &IndexSelection,
steps: &[ExecutionStep],
) -> Result<Vec<QueryRow>> {
Self::find_condition(steps, &index_selection.columns[0])
.ok_or_else(|| Error::query_execution("No condition found for index"))?;
self.full_scan_rows(table).await
}
async fn execute_bloom_filter_scan(
&self,
table: &TableId,
index_selection: &IndexSelection,
steps: &[ExecutionStep],
) -> Result<Vec<QueryRow>> {
let condition = Self::find_condition(steps, &index_selection.columns[0])
.ok_or_else(|| Error::query_execution("No condition found for bloom filter"))?;
self.point_lookup_rows(table, condition).await
}
async fn execute_primary_index_scan(
&self,
table: &TableId,
index_selection: &IndexSelection,
steps: &[ExecutionStep],
) -> Result<Vec<QueryRow>> {
let condition = Self::find_condition(steps, &index_selection.columns[0])
.ok_or_else(|| Error::query_execution("No condition found for primary key"))?;
self.point_lookup_rows(table, condition).await
}
async fn execute_composite_index_scan(
&self,
table: &TableId,
_index_selection: &IndexSelection,
_steps: &[ExecutionStep],
) -> Result<Vec<QueryRow>> {
self.full_scan_rows(table).await
}
async fn execute_parallel_table_scan(
&self,
table: &TableId,
steps: &[ExecutionStep],
) -> Result<Vec<QueryRow>> {
let parallelization = steps
.iter()
.find(|step| step.parallelization.can_parallelize)
.map(|step| &step.parallelization)
.unwrap_or(&DEFAULT_PARALLELIZATION);
let thread_count = parallelization.suggested_threads;
let (tx, rx) = channel::unbounded();
let mut handles = Vec::with_capacity(thread_count);
for worker_id in 0..thread_count {
let storage = self.storage.clone();
let table = table.clone();
let tx = tx.clone();
handles.push(tokio::spawn(async move {
match storage.scan(&table, None, None, None, None).await {
Ok(results) => {
for pair in results {
if tx.send(pair).is_err() {
break;
}
}
}
Err(e) => log::error!("Worker {} error: {:?}", worker_id, e),
}
}));
}
drop(tx);
let mut rows = Vec::new();
while let Ok((row_key, row_data)) = rx.recv() {
rows.push(self.storage_data_to_query_row(row_data, &row_key)?);
}
for handle in handles {
let _ = handle.await;
}
Ok(rows)
}
async fn apply_execution_steps(
&self,
mut rows: Vec<QueryRow>,
steps: &[ExecutionStep],
) -> Result<Vec<QueryRow>> {
for step in steps {
match step.step_type {
StepType::Filter => rows = self.apply_filter_step(rows, step)?,
StepType::Sort => rows = self.apply_sort_step(rows, step),
StepType::Project => rows = self.apply_project_step(rows, step),
StepType::Limit
| StepType::Aggregate
| StepType::Join
| StepType::Scan
| StepType::Insert => {}
}
}
Ok(rows)
}
fn apply_filter_step(
&self,
rows: Vec<QueryRow>,
step: &ExecutionStep,
) -> Result<Vec<QueryRow>> {
let mut filtered_rows = Vec::with_capacity(rows.len());
for row in rows {
let mut matches = true;
for condition in &step.conditions {
if !self.evaluate_condition(&row, condition)? {
matches = false;
break;
}
}
if matches {
filtered_rows.push(row);
}
}
Ok(filtered_rows)
}
fn apply_sort_step(&self, mut rows: Vec<QueryRow>, step: &ExecutionStep) -> Vec<QueryRow> {
let Some(sort_column) = step.columns.first() else {
return rows;
};
rows.sort_by(|a, b| {
let a_val = a.values.get(sort_column).unwrap_or(&Value::Null);
let b_val = b.values.get(sort_column).unwrap_or(&Value::Null);
self.compare_values(a_val, b_val).unwrap_or(Ordering::Equal)
});
rows
}
fn apply_project_step(&self, rows: Vec<QueryRow>, step: &ExecutionStep) -> Vec<QueryRow> {
rows.into_iter()
.map(|row| {
let mut projected_values = HashMap::with_capacity(step.columns.len());
for column in &step.columns {
if let Some(value) = row.values.get(column) {
projected_values.insert(column.clone(), value.clone());
}
}
QueryRow::with_values(row.key, projected_values)
})
.collect()
}
fn evaluate_condition(&self, row: &QueryRow, condition: &Condition) -> Result<bool> {
let row_value = row.values.get(&condition.column).unwrap_or(&Value::Null);
match condition.operator {
ComparisonOperator::Equal => Ok(row_value == &condition.value),
ComparisonOperator::NotEqual => Ok(row_value != &condition.value),
ComparisonOperator::LessThan => Ok(matches!(
self.compare_values(row_value, &condition.value)?,
Ordering::Less
)),
ComparisonOperator::LessThanOrEqual => Ok(matches!(
self.compare_values(row_value, &condition.value)?,
Ordering::Less | Ordering::Equal
)),
ComparisonOperator::GreaterThan => Ok(matches!(
self.compare_values(row_value, &condition.value)?,
Ordering::Greater
)),
ComparisonOperator::GreaterThanOrEqual => Ok(matches!(
self.compare_values(row_value, &condition.value)?,
Ordering::Greater | Ordering::Equal
)),
ComparisonOperator::In => Ok(row_value == &condition.value),
ComparisonOperator::NotIn => Ok(row_value != &condition.value),
ComparisonOperator::Like => match (row_value, &condition.value) {
(Value::Text(row_text), Value::Text(pattern)) => Ok(row_text.contains(pattern)),
_ => Ok(false),
},
ComparisonOperator::NotLike => match (row_value, &condition.value) {
(Value::Text(row_text), Value::Text(pattern)) => Ok(!row_text.contains(pattern)),
_ => Ok(true),
},
}
}
fn compare_values(&self, a: &Value, b: &Value) -> Result<Ordering> {
match (a, b) {
(Value::Integer(a), Value::Integer(b)) => Ok(a.cmp(b)),
(Value::Float(a), Value::Float(b)) => Ok(a.partial_cmp(b).unwrap_or(Ordering::Equal)),
(Value::Text(a), Value::Text(b)) => Ok(a.cmp(b)),
(Value::Boolean(a), Value::Boolean(b)) => Ok(a.cmp(b)),
(Value::Uuid(a), Value::Uuid(b)) => Ok(a.cmp(b)),
(Value::Null, Value::Null) => Ok(Ordering::Equal),
(Value::Null, _) => Ok(Ordering::Less),
(_, Value::Null) => Ok(Ordering::Greater),
_ => Err(Error::query_execution(
"Cannot compare values of different types",
)),
}
}
fn value_to_row_key(&self, value: &Value) -> Result<RowKey> {
match value {
Value::Integer(i) => Ok(RowKey::new(i.to_be_bytes().to_vec())),
Value::Text(s) => Ok(RowKey::new(s.as_bytes().to_vec())),
Value::Float(f) => Ok(RowKey::new(f.to_be_bytes().to_vec())),
Value::Boolean(b) => Ok(RowKey::new(vec![u8::from(*b)])),
Value::Null => Ok(RowKey::new(vec![0])),
Value::Uuid(bytes) => Ok(RowKey::new(bytes.to_vec())),
Value::BigInt(i) => Ok(RowKey::new(i.to_be_bytes().to_vec())),
Value::Tuple(components) => {
let mut result = Vec::new();
for component in components {
let raw = self.value_to_raw_pk_bytes(component)?;
let len = raw.len();
if len > u16::MAX as usize {
return Err(Error::query_execution(
"Composite partition key component too large",
));
}
result.extend_from_slice(&(len as u16).to_be_bytes());
result.extend_from_slice(&raw);
result.push(0x00);
}
Ok(RowKey::new(result))
}
_ => Err(Error::query_execution("Cannot convert value to row key")),
}
}
fn value_to_raw_pk_bytes(&self, value: &Value) -> Result<Vec<u8>> {
match value {
Value::Integer(i) => Ok(i.to_be_bytes().to_vec()),
Value::Text(s) => Ok(s.as_bytes().to_vec()),
Value::Float(f) => Ok(f.to_be_bytes().to_vec()),
Value::Boolean(b) => Ok(vec![u8::from(*b)]),
Value::Null => Ok(Vec::new()),
Value::Uuid(bytes) => Ok(bytes.to_vec()),
Value::BigInt(i) => Ok(i.to_be_bytes().to_vec()),
_ => Err(Error::query_execution(
"Cannot serialize value as partition key component",
)),
}
}
fn condition_to_row_key(&self, condition: &Condition) -> Result<RowKey> {
if condition.column == "id" {
if let Value::Integer(id) = &condition.value {
return Ok(RowKey::new(format!("user_key_{}", id).into_bytes()));
}
}
self.value_to_row_key(&condition.value)
}
fn storage_data_to_query_row(&self, data: Value, key: &RowKey) -> Result<QueryRow> {
let mut values = HashMap::new();
match data {
Value::Map(map) => {
for (map_key, map_value) in map {
if let Value::Text(column_name) = map_key {
values.insert(column_name, map_value);
}
}
}
other => {
values.insert("data".to_string(), other);
}
}
if values.is_empty() {
values.insert("id".to_string(), Value::Text(format!("{:?}", key)));
}
Ok(QueryRow::with_values(key.clone(), values))
}
#[cfg(feature = "experimental")]
async fn execute_insert_operation(&self, plan: &QueryPlan) -> Result<QueryResult> {
let table_id = self
.require_table(plan)
.map_err(|_| Error::query_execution("No table specified in INSERT plan"))?;
let mut inserted_count: u64 = 0;
for step in &plan.steps {
if !matches!(step.step_type, StepType::Insert) {
continue;
}
#[cfg(debug_assertions)]
eprintln!("DEBUG: INSERT step conditions: {:?}", step.conditions);
let mut key_value = format!("test_key_{}", inserted_count);
for condition in &step.conditions {
if condition.column == "id" {
if let Value::Integer(id) = &condition.value {
key_value = format!("user_key_{}", id);
break;
}
}
}
#[cfg(debug_assertions)]
eprintln!("DEBUG: Using row key: {}", key_value);
let row_key = RowKey::new(key_value.into_bytes());
let mut value_map: HashMap<String, Value> = step
.conditions
.iter()
.map(|c| (c.column.clone(), c.value.clone()))
.collect();
if value_map.is_empty() {
value_map.insert("id".to_string(), Value::Integer(inserted_count as i32 + 1));
value_map.insert(
"name".to_string(),
Value::Text(format!("TestUser{}", inserted_count + 1)),
);
}
let row_value = map_to_value(value_map);
self.storage.put(table_id, row_key, row_value).await?;
inserted_count += 1;
#[cfg(debug_assertions)]
eprintln!(
"DEBUG: execute_insert_operation - stored row {} in table {}",
inserted_count, table_id
);
}
if inserted_count == 0 {
let row_key = RowKey::new(b"default_test_key".to_vec());
let mut value_map = HashMap::new();
value_map.insert("id".to_string(), Value::Integer(1));
value_map.insert("name".to_string(), Value::Text("DefaultUser".to_string()));
self.storage
.put(table_id, row_key, map_to_value(value_map))
.await?;
inserted_count = 1;
}
#[cfg(debug_assertions)]
eprintln!(
"DEBUG: execute_insert_operation called, returning rows_affected: {}",
inserted_count
);
Ok(QueryResult {
rows: vec![],
rows_affected: inserted_count,
execution_time_ms: 0,
metadata: super::result::QueryMetadata::default(),
})
}
async fn execute_create_table_operation(&self, _plan: &QueryPlan) -> Result<QueryResult> {
Ok(QueryResult {
rows: vec![],
rows_affected: 0,
execution_time_ms: 0,
metadata: super::result::QueryMetadata::default(),
})
}
}
#[cfg(feature = "experimental")]
fn map_to_value(map: HashMap<String, Value>) -> Value {
Value::Map(map.into_iter().map(|(k, v)| (Value::Text(k), v)).collect())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Config;
use std::sync::Arc;
use tempfile::TempDir;
async fn make_executor() -> (TempDir, QueryExecutor, Config) {
let temp_dir = TempDir::new().unwrap();
let config = Config::default();
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let executor = QueryExecutor::new(storage, schema, &config);
(temp_dir, executor, config)
}
#[tokio::test]
async fn test_query_executor_creation() {
let (_tmp, executor, config) = make_executor().await;
assert_eq!(
executor._config.query.query_parallelism,
config.query.query_parallelism
);
}
#[tokio::test]
async fn test_value_comparison() {
let (_tmp, executor, _) = make_executor().await;
let result = executor
.compare_values(&Value::Integer(10), &Value::Integer(20))
.unwrap();
assert_eq!(result, Ordering::Less);
let result = executor
.compare_values(
&Value::Text("apple".to_string()),
&Value::Text("banana".to_string()),
)
.unwrap();
assert_eq!(result, Ordering::Less);
}
#[tokio::test]
async fn test_condition_evaluation() {
let (_tmp, executor, _) = make_executor().await;
let mut row_values = HashMap::new();
row_values.insert("id".to_string(), Value::Integer(1));
row_values.insert("name".to_string(), Value::Text("test".to_string()));
let row = QueryRow::with_values(RowKey::new(vec![1]), row_values);
let condition = Condition {
column: "id".to_string(),
operator: ComparisonOperator::Equal,
value: Value::Integer(1),
};
assert!(executor.evaluate_condition(&row, &condition).unwrap());
let condition = Condition {
column: "name".to_string(),
operator: ComparisonOperator::Like,
value: Value::Text("test".to_string()),
};
assert!(executor.evaluate_condition(&row, &condition).unwrap());
}
#[tokio::test]
async fn test_condition_to_row_key_mapping() {
let (_tmp, executor, _) = make_executor().await;
let id_condition = Condition {
column: "id".to_string(),
operator: ComparisonOperator::Equal,
value: Value::Integer(42),
};
let key = executor
.condition_to_row_key(&id_condition)
.expect("id condition key");
assert_eq!(std::str::from_utf8(key.as_bytes()).unwrap(), "user_key_42");
let name_condition = Condition {
column: "username".to_string(),
operator: ComparisonOperator::Equal,
value: Value::Text("carol".to_string()),
};
let key = executor
.condition_to_row_key(&name_condition)
.expect("fallback key");
assert_eq!(key.as_bytes(), b"carol");
}
}