use std::collections::{HashMap, BTreeMap};
#[derive(Debug, Clone, PartialEq)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
}
#[derive(Debug, Clone, PartialEq)]
pub enum AggFunc {
Count,
Sum,
Avg,
Min,
Max,
}
#[derive(Debug, Clone)]
pub struct AdvancedQuery {
pub select_cols: Vec<SelectCol>,
pub from_table: String,
pub joins: Vec<JoinClause>,
pub where_filters: Vec<FilterClause>,
pub group_by: Vec<String>,
pub having: Vec<FilterClause>,
pub order_by: Vec<OrderByClause>,
pub limit: Option<usize>,
pub offset: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct SelectCol {
pub column: String,
pub aggregation: Option<AggFunc>,
pub alias: Option<String>,
}
#[derive(Debug, Clone)]
pub struct JoinClause {
pub table: String,
pub join_type: JoinType,
pub on_condition: FilterClause,
}
#[derive(Debug, Clone)]
pub struct FilterClause {
pub left: String,
pub operator: String,
pub right: String,
pub logic_op: Option<String>, }
#[derive(Debug, Clone)]
pub struct OrderByClause {
pub column: String,
pub descending: bool,
}
#[derive(Debug, Clone)]
pub struct ExecutionPlan {
pub plan_id: String,
pub operations: Vec<Operation>,
pub estimated_rows: u64,
pub distributed: bool,
}
#[derive(Debug, Clone)]
pub enum Operation {
Scan {
table: String,
columns: Vec<String>,
},
Filter {
condition: FilterClause,
},
Join {
with_table: String,
join_type: JoinType,
condition: FilterClause,
},
GroupBy {
columns: Vec<String>,
aggregations: Vec<AggFunc>,
},
OrderBy {
columns: Vec<OrderByClause>,
},
Limit {
count: usize,
offset: Option<usize>,
},
Aggregate {
func: AggFunc,
column: String,
},
}
pub struct QueryOptimizer;
impl QueryOptimizer {
pub fn optimize(query: &AdvancedQuery) -> ExecutionPlan {
let mut operations = Vec::new();
let scan_cols: Vec<String> = query.select_cols.iter()
.map(|col| col.column.clone())
.collect();
operations.push(Operation::Scan {
table: query.from_table.clone(),
columns: scan_cols,
});
for filter in &query.where_filters {
operations.push(Operation::Filter {
condition: filter.clone(),
});
}
for join in &query.joins {
operations.push(Operation::Join {
with_table: join.table.clone(),
join_type: join.join_type.clone(),
condition: join.on_condition.clone(),
});
}
if !query.group_by.is_empty() {
let aggs: Vec<AggFunc> = query.select_cols.iter()
.filter_map(|col| col.aggregation.clone())
.collect();
operations.push(Operation::GroupBy {
columns: query.group_by.clone(),
aggregations: aggs,
});
}
if !query.order_by.is_empty() {
operations.push(Operation::OrderBy {
columns: query.order_by.clone(),
});
}
if query.limit.is_some() {
operations.push(Operation::Limit {
count: query.limit.unwrap_or(0),
offset: query.offset,
});
}
ExecutionPlan {
plan_id: format!("plan_{}", uuid_short()),
operations,
estimated_rows: estimate_rows(query),
distributed: !query.joins.is_empty() || !query.group_by.is_empty(),
}
}
pub fn estimate_row_count(query: &AdvancedQuery) -> u64 {
estimate_rows(query)
}
}
pub struct DistributedExecutor;
impl DistributedExecutor {
pub fn execute_distributed(
plan: &ExecutionPlan,
data_partitions: Vec<Vec<HashMap<String, String>>>,
) -> QueryResult {
let mut results = Vec::new();
for partition in data_partitions {
let mut partition_results = Vec::new();
let mut current_data = partition;
for operation in &plan.operations {
match operation {
Operation::Filter { condition } => {
current_data = Self::apply_filter(¤t_data, condition);
}
Operation::GroupBy { columns, aggregations } => {
partition_results = Self::apply_groupby(¤t_data, columns, aggregations);
break;
}
Operation::OrderBy { columns } => {
current_data = Self::apply_orderby(¤t_data, columns);
}
Operation::Limit { count, offset } => {
current_data = Self::apply_limit(¤t_data, *count, *offset);
}
_ => {}
}
}
results.extend(current_data);
results.extend(partition_results);
}
let rows_processed = results.len() as u64;
QueryResult {
rows: results,
execution_time_ms: 0.0,
rows_processed,
}
}
fn apply_filter(
data: &[HashMap<String, String>],
condition: &FilterClause,
) -> Vec<HashMap<String, String>> {
data.iter()
.filter(|row| {
if let Some(left_val) = row.get(&condition.left) {
if let Some(right_val) = row.get(&condition.right) {
match condition.operator.as_str() {
"=" => left_val == right_val,
">" => left_val > right_val,
"<" => left_val < right_val,
_ => true,
}
} else {
false
}
} else {
false
}
})
.cloned()
.collect()
}
fn apply_groupby(
data: &[HashMap<String, String>],
group_cols: &[String],
aggs: &[AggFunc],
) -> Vec<HashMap<String, String>> {
let mut groups: BTreeMap<Vec<String>, Vec<HashMap<String, String>>> = BTreeMap::new();
for row in data {
let key: Vec<String> = group_cols
.iter()
.filter_map(|col| row.get(col).cloned())
.collect();
groups.entry(key).or_insert_with(Vec::new).push(row.clone());
}
let mut results = Vec::new();
for (_key, group) in groups {
let mut result = HashMap::new();
for col in group_cols.iter() {
if let Some(first_row) = group.first() {
if let Some(val) = first_row.get(col) {
result.insert(col.clone(), val.clone());
}
}
}
for agg in aggs {
let agg_result = match agg {
AggFunc::Count => group.len().to_string(),
AggFunc::Sum => "0".to_string(), AggFunc::Avg => "0".to_string(),
AggFunc::Min => "0".to_string(),
AggFunc::Max => "0".to_string(),
};
result.insert(format!("{:?}", agg), agg_result);
}
results.push(result);
}
results
}
fn apply_orderby(
data: &[HashMap<String, String>],
order_cols: &[OrderByClause],
) -> Vec<HashMap<String, String>> {
let mut result = data.to_vec();
for order in order_cols.iter().rev() {
result.sort_by(|a, b| {
let a_val = a.get(&order.column).map(|s| s.as_str()).unwrap_or("");
let b_val = b.get(&order.column).map(|s| s.as_str()).unwrap_or("");
if order.descending {
b_val.cmp(a_val)
} else {
a_val.cmp(b_val)
}
});
}
result
}
fn apply_limit(
data: &[HashMap<String, String>],
count: usize,
offset: Option<usize>,
) -> Vec<HashMap<String, String>> {
let start = offset.unwrap_or(0);
data.iter()
.skip(start)
.take(count)
.cloned()
.collect()
}
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub rows: Vec<HashMap<String, String>>,
pub execution_time_ms: f64,
pub rows_processed: u64,
}
fn uuid_short() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("{:x}", now % 0xFFFFFF)
}
fn estimate_rows(query: &AdvancedQuery) -> u64 {
let base_rows = 10_000_000u64;
let mut selectivity = 1.0;
for _filter in &query.where_filters {
selectivity *= 0.5; }
(base_rows as f64 * selectivity) as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query_optimizer() {
let query = AdvancedQuery {
select_cols: vec![SelectCol {
column: "id".to_string(),
aggregation: None,
alias: None,
}],
from_table: "users".to_string(),
joins: vec![],
where_filters: vec![],
group_by: vec![],
having: vec![],
order_by: vec![],
limit: Some(100),
offset: None,
};
let plan = QueryOptimizer::optimize(&query);
assert!(!plan.operations.is_empty());
}
#[test]
fn test_row_estimation() {
let query = AdvancedQuery {
select_cols: vec![],
from_table: "table".to_string(),
joins: vec![],
where_filters: vec![FilterClause {
left: "col".to_string(),
operator: ">".to_string(),
right: "100".to_string(),
logic_op: None,
}],
group_by: vec![],
having: vec![],
order_by: vec![],
limit: None,
offset: None,
};
let estimate = QueryOptimizer::estimate_row_count(&query);
assert!(estimate > 0);
assert!(estimate < 10_000_000);
}
#[test]
fn test_execution_plan_distributed() {
let query = AdvancedQuery {
select_cols: vec![],
from_table: "table".to_string(),
joins: vec![JoinClause {
table: "other".to_string(),
join_type: JoinType::Inner,
on_condition: FilterClause {
left: "id".to_string(),
operator: "=".to_string(),
right: "other_id".to_string(),
logic_op: None,
},
}],
where_filters: vec![],
group_by: vec![],
having: vec![],
order_by: vec![],
limit: None,
offset: None,
};
let plan = QueryOptimizer::optimize(&query);
assert!(plan.distributed);
}
#[test]
fn test_distributed_executor() {
let mut partition = Vec::new();
let mut row = HashMap::new();
row.insert("id".to_string(), "1".to_string());
row.insert("name".to_string(), "Alice".to_string());
partition.push(row);
let plan = ExecutionPlan {
plan_id: "test".to_string(),
operations: vec![Operation::Limit {
count: 10,
offset: None,
}],
estimated_rows: 1,
distributed: false,
};
let result = DistributedExecutor::execute_distributed(&plan, vec![partition]);
assert_eq!(result.rows.len(), 1);
}
#[test]
fn test_join_type_variants() {
assert_ne!(JoinType::Inner, JoinType::Left);
assert_ne!(JoinType::Left, JoinType::Right);
}
#[test]
fn test_agg_func_variants() {
assert_ne!(AggFunc::Count, AggFunc::Sum);
assert_eq!(AggFunc::Count, AggFunc::Count);
}
}