use super::filter::Filter;
use super::filter_compiled::CompiledFilter;
use super::sort::{OrderBy, QueryLimits};
use crate::storage::schema::{Row, Value};
use std::collections::HashMap;
#[inline]
fn build_schema_index(columns: &[String]) -> HashMap<String, usize> {
columns
.iter()
.enumerate()
.map(|(i, c)| (c.clone(), i))
.collect()
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub columns: Vec<String>,
pub rows: Vec<Row>,
pub total_count: Option<usize>,
pub stats: QueryStats,
}
impl QueryResult {
pub fn empty() -> Self {
Self {
columns: Vec::new(),
rows: Vec::new(),
total_count: Some(0),
stats: QueryStats::default(),
}
}
pub fn from_rows(columns: Vec<String>, rows: Vec<Row>) -> Self {
let count = rows.len();
Self {
columns,
rows,
total_count: Some(count),
stats: QueryStats::default(),
}
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn column_index(&self, name: &str) -> Option<usize> {
self.columns.iter().position(|c| c == name)
}
pub fn get_value(&self, row_idx: usize, column: &str) -> Option<&Value> {
let col_idx = self.column_index(column)?;
self.rows.get(row_idx)?.get(col_idx)
}
pub fn iter_rows(&self) -> impl Iterator<Item = RowView<'_>> {
self.rows.iter().map(|row| RowView {
columns: &self.columns,
row,
})
}
}
pub struct RowView<'a> {
columns: &'a [String],
row: &'a Row,
}
impl<'a> RowView<'a> {
pub fn get(&self, column: &str) -> Option<&Value> {
let idx = self.columns.iter().position(|c| c == column)?;
self.row.get(idx)
}
pub fn get_index(&self, idx: usize) -> Option<&Value> {
self.row.get(idx)
}
pub fn values(&self) -> &[Value] {
self.row.values()
}
}
#[derive(Debug, Clone, Default)]
pub struct QueryStats {
pub rows_scanned: usize,
pub rows_matched: usize,
pub rows_returned: usize,
pub execution_time_us: u64,
pub index_used: Option<String>,
}
#[derive(Debug, Clone)]
pub struct QueryPlan {
pub table: String,
pub columns: Vec<String>,
pub filter: Option<Filter>,
pub order_by: OrderBy,
pub limits: QueryLimits,
pub count_total: bool,
}
impl QueryPlan {
pub fn new(table: impl Into<String>) -> Self {
Self {
table: table.into(),
columns: Vec::new(),
filter: None,
order_by: OrderBy::new(),
limits: QueryLimits::none(),
count_total: false,
}
}
pub fn select(mut self, columns: Vec<String>) -> Self {
self.columns = columns;
self
}
pub fn filter(mut self, filter: Filter) -> Self {
self.filter = Some(match self.filter {
Some(existing) => existing.and_filter(filter),
None => filter,
});
self
}
pub fn order_by(mut self, order: OrderBy) -> Self {
self.order_by = order;
self
}
pub fn limit(mut self, n: usize) -> Self {
self.limits = self.limits.limit(n);
self
}
pub fn offset(mut self, n: usize) -> Self {
self.limits = self.limits.offset(n);
self
}
pub fn with_total_count(mut self) -> Self {
self.count_total = true;
self
}
}
#[derive(Debug, Clone)]
pub struct Query {
plan: QueryPlan,
}
impl Query {
pub fn select(table: impl Into<String>) -> Self {
Self {
plan: QueryPlan::new(table),
}
}
pub fn columns(mut self, columns: Vec<impl Into<String>>) -> Self {
self.plan.columns = columns.into_iter().map(|c| c.into()).collect();
self
}
pub fn filter(mut self, filter: Filter) -> Self {
self.plan = self.plan.filter(filter);
self
}
pub fn order_by_asc(mut self, column: impl Into<String>) -> Self {
self.plan.order_by = self.plan.order_by.asc(column);
self
}
pub fn order_by_desc(mut self, column: impl Into<String>) -> Self {
self.plan.order_by = self.plan.order_by.desc(column);
self
}
pub fn limit(mut self, n: usize) -> Self {
self.plan = self.plan.limit(n);
self
}
pub fn offset(mut self, n: usize) -> Self {
self.plan = self.plan.offset(n);
self
}
pub fn with_total_count(mut self) -> Self {
self.plan = self.plan.with_total_count();
self
}
pub fn plan(self) -> QueryPlan {
self.plan
}
}
pub trait QueryExecutor {
fn execute(&self, plan: &QueryPlan) -> Result<QueryResult, QueryError>;
fn count(&self, table: &str, filter: Option<&Filter>) -> Result<usize, QueryError>;
fn table_exists(&self, table: &str) -> bool;
fn table_columns(&self, table: &str) -> Option<Vec<String>>;
}
#[derive(Debug, Clone)]
pub enum QueryError {
TableNotFound(String),
ColumnNotFound(String),
InvalidFilter(String),
TypeMismatch(String),
StorageError(String),
Timeout,
}
impl std::fmt::Display for QueryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryError::TableNotFound(t) => write!(f, "Table not found: {}", t),
QueryError::ColumnNotFound(c) => write!(f, "Column not found: {}", c),
QueryError::InvalidFilter(msg) => write!(f, "Invalid filter: {}", msg),
QueryError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
QueryError::StorageError(msg) => write!(f, "Storage error: {}", msg),
QueryError::Timeout => write!(f, "Query timeout"),
}
}
}
impl std::error::Error for QueryError {}
pub struct MemoryExecutor {
tables: HashMap<String, (Vec<String>, Vec<Row>)>,
}
impl MemoryExecutor {
pub fn new() -> Self {
Self {
tables: HashMap::new(),
}
}
pub fn add_table(&mut self, name: impl Into<String>, columns: Vec<String>, rows: Vec<Row>) {
self.tables.insert(name.into(), (columns, rows));
}
pub fn insert(&mut self, table: &str, row: Row) -> bool {
if let Some((_, rows)) = self.tables.get_mut(table) {
rows.push(row);
true
} else {
false
}
}
fn get_row_value(&self, row: &Row, columns: &[String], column: &str) -> Value {
if let Some(idx) = columns.iter().position(|c| c == column) {
row.get(idx).cloned().unwrap_or(Value::Null)
} else {
Value::Null
}
}
}
impl Default for MemoryExecutor {
fn default() -> Self {
Self::new()
}
}
impl QueryExecutor for MemoryExecutor {
fn execute(&self, plan: &QueryPlan) -> Result<QueryResult, QueryError> {
let start = std::time::Instant::now();
let (columns, rows) = self
.tables
.get(&plan.table)
.ok_or_else(|| QueryError::TableNotFound(plan.table.clone()))?;
let mut stats = QueryStats {
rows_scanned: rows.len(),
..Default::default()
};
let compiled_filter = plan.filter.as_ref().and_then(|f| {
let schema = build_schema_index(columns);
CompiledFilter::compile(f, &schema).ok()
});
let mut matched_rows: Vec<Row> = rows
.iter()
.filter(|row| match (&compiled_filter, &plan.filter) {
(Some(compiled), _) => compiled.evaluate(row.values()),
(None, Some(filter)) => filter.evaluate(&|col| {
if let Some(idx) = columns.iter().position(|c| c == col) {
row.get(idx).cloned()
} else {
None
}
}),
(None, None) => true,
})
.cloned()
.collect();
stats.rows_matched = matched_rows.len();
let total_count = if plan.count_total {
Some(matched_rows.len())
} else {
None
};
if !plan.order_by.is_empty() {
plan.order_by.sort_rows(&mut matched_rows, |row, col| {
self.get_row_value(row, columns, col)
});
}
let result_rows = plan.limits.apply(matched_rows);
stats.rows_returned = result_rows.len();
let result_columns = if plan.columns.is_empty() {
columns.clone()
} else {
for col in &plan.columns {
if !columns.contains(col) {
return Err(QueryError::ColumnNotFound(col.clone()));
}
}
let col_indices: Vec<usize> = plan
.columns
.iter()
.filter_map(|c| columns.iter().position(|col| col == c))
.collect();
let projected_rows: Vec<Row> = result_rows
.into_iter()
.map(|row| {
let projected_values: Vec<Value> = col_indices
.iter()
.map(|&idx| row.get(idx).cloned().unwrap_or(Value::Null))
.collect();
Row::new(projected_values)
})
.collect();
stats.rows_returned = projected_rows.len();
stats.execution_time_us = start.elapsed().as_micros() as u64;
return Ok(QueryResult {
columns: plan.columns.clone(),
rows: projected_rows,
total_count,
stats,
});
};
stats.execution_time_us = start.elapsed().as_micros() as u64;
Ok(QueryResult {
columns: result_columns,
rows: result_rows,
total_count,
stats,
})
}
fn count(&self, table: &str, filter: Option<&Filter>) -> Result<usize, QueryError> {
let (columns, rows) = self
.tables
.get(table)
.ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
let compiled = filter.and_then(|f| {
let schema = build_schema_index(columns);
CompiledFilter::compile(f, &schema).ok()
});
let count = rows
.iter()
.filter(|row| match (&compiled, filter) {
(Some(c), _) => c.evaluate(row.values()),
(None, Some(f)) => f.evaluate(&|col| {
if let Some(idx) = columns.iter().position(|c| c == col) {
row.get(idx).cloned()
} else {
None
}
}),
(None, None) => true,
})
.count();
Ok(count)
}
fn table_exists(&self, table: &str) -> bool {
self.tables.contains_key(table)
}
fn table_columns(&self, table: &str) -> Option<Vec<String>> {
self.tables.get(table).map(|(cols, _)| cols.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_executor() -> MemoryExecutor {
let mut executor = MemoryExecutor::new();
let columns = vec![
"id".to_string(),
"name".to_string(),
"age".to_string(),
"department".to_string(),
];
let rows = vec![
Row::new(vec![
Value::Integer(1),
Value::text("Alice".to_string()),
Value::Integer(30),
Value::text("Engineering".to_string()),
]),
Row::new(vec![
Value::Integer(2),
Value::text("Bob".to_string()),
Value::Integer(25),
Value::text("Sales".to_string()),
]),
Row::new(vec![
Value::Integer(3),
Value::text("Charlie".to_string()),
Value::Integer(35),
Value::text("Engineering".to_string()),
]),
Row::new(vec![
Value::Integer(4),
Value::text("Diana".to_string()),
Value::Integer(28),
Value::text("HR".to_string()),
]),
Row::new(vec![
Value::Integer(5),
Value::text("Eve".to_string()),
Value::Integer(32),
Value::text("Engineering".to_string()),
]),
];
executor.add_table("employees", columns, rows);
executor
}
#[test]
fn test_simple_select() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees");
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 5);
assert_eq!(result.columns.len(), 4);
}
#[test]
fn test_select_columns() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees").select(vec!["name".to_string(), "age".to_string()]);
let result = executor.execute(&plan).unwrap();
assert_eq!(result.columns, vec!["name", "age"]);
assert_eq!(result.rows[0].len(), 2);
}
#[test]
fn test_filter_eq() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees").filter(Filter::eq(
"department",
Value::text("Engineering".to_string()),
));
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 3); }
#[test]
fn test_filter_gt() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees").filter(Filter::gt("age", Value::Integer(30)));
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 2); }
#[test]
fn test_filter_and() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees")
.filter(Filter::eq(
"department",
Value::text("Engineering".to_string()),
))
.filter(Filter::ge("age", Value::Integer(30)));
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 3); }
#[test]
fn test_order_by() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees").order_by(OrderBy::new().asc("age"));
let result = executor.execute(&plan).unwrap();
assert_eq!(
result.get_value(0, "name"),
Some(&Value::text("Bob".to_string()))
);
assert_eq!(
result.get_value(4, "name"),
Some(&Value::text("Charlie".to_string()))
);
}
#[test]
fn test_order_by_desc() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees").order_by(OrderBy::new().desc("age"));
let result = executor.execute(&plan).unwrap();
assert_eq!(
result.get_value(0, "name"),
Some(&Value::text("Charlie".to_string()))
);
assert_eq!(
result.get_value(4, "name"),
Some(&Value::text("Bob".to_string()))
);
}
#[test]
fn test_limit() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees").limit(2);
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 2);
}
#[test]
fn test_offset() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees")
.order_by(OrderBy::new().asc("id"))
.offset(2);
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 3); assert_eq!(result.get_value(0, "id"), Some(&Value::Integer(3)));
}
#[test]
fn test_limit_offset() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees")
.order_by(OrderBy::new().asc("id"))
.offset(1)
.limit(2);
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result.get_value(0, "id"), Some(&Value::Integer(2)));
assert_eq!(result.get_value(1, "id"), Some(&Value::Integer(3)));
}
#[test]
fn test_count() {
let executor = create_test_executor();
let count = executor.count("employees", None).unwrap();
assert_eq!(count, 5);
let filter = Filter::eq("department", Value::text("Engineering".to_string()));
let count = executor.count("employees", Some(&filter)).unwrap();
assert_eq!(count, 3);
}
#[test]
fn test_total_count() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees").limit(2).with_total_count();
let result = executor.execute(&plan).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result.total_count, Some(5));
}
#[test]
fn test_table_not_found() {
let executor = create_test_executor();
let plan = QueryPlan::new("nonexistent");
let result = executor.execute(&plan);
assert!(matches!(result, Err(QueryError::TableNotFound(_))));
}
#[test]
fn test_column_not_found() {
let executor = create_test_executor();
let plan =
QueryPlan::new("employees").select(vec!["name".to_string(), "nonexistent".to_string()]);
let result = executor.execute(&plan);
assert!(matches!(result, Err(QueryError::ColumnNotFound(_))));
}
#[test]
fn test_query_builder() {
let executor = create_test_executor();
let query = Query::select("employees")
.columns(vec!["name", "age"])
.filter(Filter::gt("age", Value::Integer(28)))
.order_by_desc("age")
.limit(3);
let result = executor.execute(&query.plan()).unwrap();
assert_eq!(result.columns, vec!["name", "age"]);
assert_eq!(result.len(), 3);
}
#[test]
fn test_row_view() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees")
.order_by(OrderBy::new().asc("id"))
.limit(1);
let result = executor.execute(&plan).unwrap();
for row in result.iter_rows() {
assert_eq!(row.get("name"), Some(&Value::text("Alice".to_string())));
assert_eq!(row.get("age"), Some(&Value::Integer(30)));
}
}
#[test]
fn test_stats() {
let executor = create_test_executor();
let plan = QueryPlan::new("employees")
.filter(Filter::eq(
"department",
Value::text("Engineering".to_string()),
))
.limit(2);
let result = executor.execute(&plan).unwrap();
assert_eq!(result.stats.rows_scanned, 5);
assert_eq!(result.stats.rows_matched, 3);
assert_eq!(result.stats.rows_returned, 2);
}
#[test]
fn test_insert() {
let mut executor = create_test_executor();
let new_row = Row::new(vec![
Value::Integer(6),
Value::text("Frank".to_string()),
Value::Integer(40),
Value::text("Legal".to_string()),
]);
assert!(executor.insert("employees", new_row));
let count = executor.count("employees", None).unwrap();
assert_eq!(count, 6);
}
#[test]
fn test_table_exists() {
let executor = create_test_executor();
assert!(executor.table_exists("employees"));
assert!(!executor.table_exists("nonexistent"));
}
#[test]
fn test_table_columns() {
let executor = create_test_executor();
let columns = executor.table_columns("employees").unwrap();
assert_eq!(columns, vec!["id", "name", "age", "department"]);
assert!(executor.table_columns("nonexistent").is_none());
}
}