use std::collections::HashMap;
use std::sync::Arc;
use manifoldb_core::{Entity, Value};
use manifoldb_query::ast::DistanceMetric;
use manifoldb_query::ast::Literal;
use manifoldb_query::ast::Statement;
use manifoldb_query::exec::operators::{
BruteForceSearchOp, HashAggregateOp, HashJoinOp, NestedLoopJoinOp, SetOpOp, UnionOp, ValuesOp,
};
use manifoldb_query::exec::row::{Row, Schema};
use manifoldb_query::exec::{ExecutionContext, Operator, ResultSet};
use manifoldb_query::plan::logical::{AnnSearchNode, ExpandNode, PathScanNode, VectorDistanceNode};
use manifoldb_query::plan::logical::{
CreateCollectionNode, CreateIndexNode, CreateTableNode, DropCollectionNode, DropIndexNode,
DropTableNode, JoinType, LogicalExpr, SetOpNode, UnionNode,
};
use manifoldb_query::plan::physical::{IndexInfo, IndexType, PlannerCatalog};
use manifoldb_query::plan::{LogicalPlan, PhysicalPlan, PhysicalPlanner, PlanBuilder};
use manifoldb_query::ExtendedParser;
use manifoldb_storage::Transaction;
use crate::prepared::PreparedStatement;
use super::graph_accessor;
use super::StorageScan;
use crate::error::{Error, Result};
use crate::schema::SchemaManager;
use crate::transaction::DatabaseTransaction;
pub fn execute_query<T: Transaction>(
tx: &DatabaseTransaction<T>,
sql: &str,
params: &[Value],
) -> Result<ResultSet> {
execute_query_with_limit(tx, sql, params, 0)
}
pub fn execute_query_with_limit<T: Transaction>(
tx: &DatabaseTransaction<T>,
sql: &str,
params: &[Value],
max_rows_in_memory: usize,
) -> Result<ResultSet> {
execute_query_with_catalog(tx, sql, params, max_rows_in_memory, None)
}
pub fn execute_query_with_catalog<T: Transaction>(
tx: &DatabaseTransaction<T>,
sql: &str,
params: &[Value],
max_rows_in_memory: usize,
external_catalog: Option<PlannerCatalog>,
) -> Result<ResultSet> {
let stmt = ExtendedParser::parse_single(sql)?;
let (is_explain, inner_stmt) = match &stmt {
Statement::Explain(inner) => (true, inner.as_ref()),
other => (false, other),
};
let mut builder = PlanBuilder::new();
let logical_plan =
builder.build_statement(inner_stmt).map_err(|e| Error::Parse(e.to_string()))?;
let mut catalog = build_planner_catalog(tx)?;
if let Some(external) = external_catalog {
catalog = catalog.merge(external);
}
let planner = PhysicalPlanner::new().with_catalog(catalog);
let physical_plan = planner.plan(&logical_plan);
if is_explain {
return Ok(build_explain_result(&logical_plan, &physical_plan));
}
let ctx = create_context_with_limit(params, max_rows_in_memory);
execute_physical_plan(tx, &physical_plan, &logical_plan, &ctx)
}
fn build_explain_result(logical: &LogicalPlan, physical: &PhysicalPlan) -> ResultSet {
let schema = Arc::new(Schema::new(vec!["plan".to_string()]));
let mut rows = Vec::new();
rows.push(Row::new(schema.clone(), vec![Value::from("=== Logical Plan ===")]));
let logical_tree = logical.display_tree().to_string();
for line in logical_tree.lines() {
rows.push(Row::new(schema.clone(), vec![Value::from(line)]));
}
rows.push(Row::new(schema.clone(), vec![Value::from("")]));
rows.push(Row::new(schema.clone(), vec![Value::from("=== Physical Plan ===")]));
let physical_tree = physical.display_tree().to_string();
for line in physical_tree.lines() {
rows.push(Row::new(schema.clone(), vec![Value::from(line)]));
}
ResultSet::with_rows(schema, rows)
}
pub fn execute_statement<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
sql: &str,
params: &[Value],
) -> Result<u64> {
let stmt = ExtendedParser::parse_single(sql)?;
let mut builder = PlanBuilder::new();
let logical_plan = builder.build_statement(&stmt).map_err(|e| Error::Parse(e.to_string()))?;
let ctx = create_context(params);
match &logical_plan {
LogicalPlan::Insert { table, columns, input, .. } => {
execute_insert(tx, table, columns, input, &ctx)
}
LogicalPlan::Update { table, assignments, filter, .. } => {
execute_update(tx, table, assignments, filter, &ctx)
}
LogicalPlan::Delete { table, filter, .. } => execute_delete(tx, table, filter, &ctx),
LogicalPlan::CreateTable(node) => execute_create_table(tx, node),
LogicalPlan::DropTable(node) => execute_drop_table(tx, node),
LogicalPlan::CreateIndex(node) => execute_create_index(tx, node),
LogicalPlan::DropIndex(node) => execute_drop_index(tx, node),
LogicalPlan::CreateCollection(node) => execute_create_collection(tx, node),
LogicalPlan::DropCollection(node) => execute_drop_collection(tx, node),
_ => {
Err(Error::Execution("Expected DML or DDL statement".to_string()))
}
}
}
pub fn execute_prepared_query<T: Transaction>(
tx: &DatabaseTransaction<T>,
stmt: &PreparedStatement,
params: &[Value],
) -> Result<ResultSet> {
let ctx = create_context(params);
execute_physical_plan(tx, stmt.physical_plan(), stmt.logical_plan(), &ctx)
}
pub fn execute_prepared_statement<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
stmt: &PreparedStatement,
params: &[Value],
) -> Result<u64> {
let ctx = create_context(params);
match stmt.logical_plan() {
LogicalPlan::Insert { table, columns, input, .. } => {
execute_insert(tx, table, columns, input, &ctx)
}
LogicalPlan::Update { table, assignments, filter, .. } => {
execute_update(tx, table, assignments, filter, &ctx)
}
LogicalPlan::Delete { table, filter, .. } => execute_delete(tx, table, filter, &ctx),
LogicalPlan::CreateTable(node) => execute_create_table(tx, node),
LogicalPlan::DropTable(node) => execute_drop_table(tx, node),
LogicalPlan::CreateIndex(node) => execute_create_index(tx, node),
LogicalPlan::DropIndex(node) => execute_drop_index(tx, node),
LogicalPlan::CreateCollection(node) => execute_create_collection(tx, node),
LogicalPlan::DropCollection(node) => execute_drop_collection(tx, node),
_ => {
Err(Error::Execution("Expected DML or DDL statement".to_string()))
}
}
}
fn create_context(params: &[Value]) -> ExecutionContext {
create_context_with_limit(params, 0)
}
fn create_context_with_limit(params: &[Value], max_rows_in_memory: usize) -> ExecutionContext {
use manifoldb_query::exec::ExecutionConfig;
let mut param_map = HashMap::new();
for (i, value) in params.iter().enumerate() {
param_map.insert((i + 1) as u32, value.clone());
}
let config = ExecutionConfig::new().with_max_rows_in_memory(max_rows_in_memory);
ExecutionContext::with_parameters(param_map).with_config(config)
}
fn build_planner_catalog<T: Transaction>(tx: &DatabaseTransaction<T>) -> Result<PlannerCatalog> {
let mut catalog = PlannerCatalog::new();
let index_names = SchemaManager::list_indexes(tx).unwrap_or_default();
for name in index_names {
if let Ok(Some(schema)) = SchemaManager::get_index(tx, &name) {
let index_type = match schema.using.as_deref() {
Some("hnsw" | "HNSW") => IndexType::Hnsw,
Some("hash" | "HASH") => IndexType::Hash,
_ => IndexType::BTree, };
if index_type == IndexType::BTree {
let columns: Vec<String> = schema.columns.iter().map(|c| c.expr.clone()).collect();
let index_info = IndexInfo::btree(&schema.name, &schema.table, columns);
catalog = catalog.with_index(index_info);
}
}
}
Ok(catalog)
}
fn try_execute_from_physical<T: Transaction>(
tx: &DatabaseTransaction<T>,
physical: &PhysicalPlan,
logical: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<Option<ResultSet>> {
use super::index_scan::{execute_index_range_scan, execute_index_scan};
match physical {
PhysicalPlan::IndexScan(scan_node) => {
let entities = execute_index_scan(tx, scan_node, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns);
let schema = scan.schema();
let rows = scan.collect_rows();
Ok(Some(ResultSet::with_rows(schema, rows)))
}
PhysicalPlan::IndexRangeScan(scan_node) => {
let entities = execute_index_range_scan(tx, scan_node, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns);
let schema = scan.schema();
let rows = scan.collect_rows();
Ok(Some(ResultSet::with_rows(schema, rows)))
}
PhysicalPlan::Filter { node, input } => {
if let Some(result) = try_execute_from_physical(tx, input, logical, ctx)? {
let schema = result.schema_arc();
let filtered_rows: Vec<Row> = result
.into_rows()
.into_iter()
.filter(|row| {
let val = evaluate_row_expr(&node.predicate, row);
matches!(val, Value::Bool(true))
})
.collect();
return Ok(Some(ResultSet::with_rows(schema, filtered_rows)));
}
Ok(None)
}
PhysicalPlan::Project { node, input } => {
if let Some(result) = try_execute_from_physical(tx, input, logical, ctx)? {
let has_wildcard = node.exprs.iter().any(|e| matches!(e, LogicalExpr::Wildcard));
if has_wildcard {
return Ok(Some(result));
}
let projected_columns: Vec<String> =
node.exprs.iter().map(|e| expr_to_column_name(e)).collect();
let new_schema = Arc::new(Schema::new(projected_columns.clone()));
let result_schema = result.schema_arc();
let rows: Vec<Row> = result
.rows()
.iter()
.map(|row| {
let values: Vec<Value> = node
.exprs
.iter()
.map(|expr| evaluate_expr_on_row(expr, row, &result_schema, ctx))
.collect();
Row::new(Arc::clone(&new_schema), values)
})
.collect();
return Ok(Some(ResultSet::with_rows(new_schema, rows)));
}
Ok(None)
}
_ => {
Ok(None)
}
}
}
fn execute_physical_plan<T: Transaction>(
tx: &DatabaseTransaction<T>,
physical: &PhysicalPlan,
logical: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
if let Some(result) = try_execute_from_physical(tx, physical, logical, ctx)? {
return Ok(result);
}
match logical {
LogicalPlan::Project { node, input } => {
if let LogicalPlan::Aggregate { node: agg_node, input: agg_input } = input.as_ref() {
let result = execute_aggregate(tx, agg_node, agg_input, ctx)?;
let has_wildcard = node.exprs.iter().any(|e| matches!(e, LogicalExpr::Wildcard));
if has_wildcard {
return Ok(result);
}
let projected_columns: Vec<String> =
node.exprs.iter().map(|e| expr_to_column_name(e)).collect();
let new_schema = Arc::new(Schema::new(projected_columns.clone()));
let result_schema = result.schema_arc();
let rows: Vec<Row> = result
.rows()
.iter()
.map(|row| {
let values: Vec<Value> = node
.exprs
.iter()
.map(|expr| evaluate_expr_on_row(expr, row, &result_schema, ctx))
.collect();
Row::new(Arc::clone(&new_schema), values)
})
.collect();
return Ok(ResultSet::with_rows(new_schema, rows));
}
if contains_join(input) {
return execute_join_projection(tx, &node.exprs, input, ctx);
}
if contains_graph(input) {
return execute_graph_projection(tx, &node.exprs, input, ctx);
}
let input_result = execute_logical_plan(tx, input, ctx)?;
let has_wildcard = node.exprs.iter().any(|e| matches!(e, LogicalExpr::Wildcard));
if has_wildcard {
let columns = collect_all_columns(&input_result);
let scan = StorageScan::new(input_result, columns);
let schema = scan.schema();
let rows = scan.collect_rows();
Ok(ResultSet::with_rows(schema, rows))
} else {
let projected_columns: Vec<String> =
node.exprs.iter().map(|e| expr_to_column_name(e)).collect();
let schema = Arc::new(Schema::new(projected_columns.clone()));
let mut rows = Vec::new();
for entity in &input_result {
let values: Vec<Value> =
node.exprs.iter().map(|expr| evaluate_expr(expr, entity, ctx)).collect();
rows.push(Row::new(Arc::clone(&schema), values));
}
Ok(ResultSet::with_rows(schema, rows))
}
}
LogicalPlan::Aggregate { node, input } => {
execute_aggregate(tx, node, input, ctx)
}
LogicalPlan::Join { node, left, right } => execute_join(tx, node, left, right, ctx),
LogicalPlan::Distinct { input, .. } => {
execute_distinct(tx, input, ctx)
}
LogicalPlan::Union { node, inputs } => {
execute_union(tx, node, inputs, ctx)
}
LogicalPlan::SetOp { node, left, right } => {
execute_set_op(tx, node, left, right, ctx)
}
LogicalPlan::Expand { node, input } => {
execute_expand(tx, node, input, ctx)
}
LogicalPlan::PathScan { node, input } => {
execute_path_scan(tx, node, input, ctx)
}
LogicalPlan::AnnSearch { node, input } => {
execute_ann_search(tx, node, input, ctx)
}
LogicalPlan::VectorDistance { node, input } => {
execute_vector_distance(tx, node, input, ctx)
}
_ => {
let entities = execute_logical_plan(tx, logical, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns);
let schema = scan.schema();
let rows = scan.collect_rows();
Ok(ResultSet::with_rows(schema, rows))
}
}
}
fn execute_aggregate<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &manifoldb_query::plan::logical::AggregateNode,
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let entities = execute_logical_plan(tx, input, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns.clone());
let rows: Vec<Vec<Value>> = scan.collect_values();
let input_op: Box<dyn Operator> = Box::new(ValuesOp::with_columns(columns, rows));
let mut agg_op = HashAggregateOp::new(
node.group_by.clone(),
node.aggregates.clone(),
node.having.clone(),
input_op,
);
agg_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let schema = agg_op.schema();
let mut result_rows = Vec::new();
while let Some(row) = agg_op.next().map_err(|e| Error::Execution(e.to_string()))? {
result_rows.push(row);
}
agg_op.close().map_err(|e| Error::Execution(e.to_string()))?;
Ok(ResultSet::with_rows(schema, result_rows))
}
fn execute_distinct<T: Transaction>(
tx: &DatabaseTransaction<T>,
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
if contains_join(input) {
let join_result = execute_join_plan(tx, input, ctx)?;
return deduplicate_result_set(join_result, ctx);
}
if let LogicalPlan::Project { node, input: proj_input } = input {
let entities = execute_logical_plan(tx, proj_input, ctx)?;
let has_wildcard = node.exprs.iter().any(|e| matches!(e, LogicalExpr::Wildcard));
let (columns, rows): (Vec<String>, Vec<Vec<Value>>) = if has_wildcard {
let cols = collect_all_columns(&entities);
let scan = StorageScan::new(entities, cols.clone());
(cols, scan.collect_values())
} else {
let cols: Vec<String> = node.exprs.iter().map(|e| expr_to_column_name(e)).collect();
let row_vals: Vec<Vec<Value>> = entities
.iter()
.map(|entity| {
node.exprs.iter().map(|expr| evaluate_expr(expr, entity, ctx)).collect()
})
.collect();
(cols, row_vals)
};
let group_by: Vec<LogicalExpr> = columns.iter().map(|c| LogicalExpr::column(c)).collect();
let input_op: Box<dyn Operator> = Box::new(ValuesOp::with_columns(columns, rows));
let mut agg_op = HashAggregateOp::new(group_by, vec![], None, input_op);
agg_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let schema = agg_op.schema();
let mut result_rows = Vec::new();
while let Some(row) = agg_op.next().map_err(|e| Error::Execution(e.to_string()))? {
result_rows.push(row);
}
agg_op.close().map_err(|e| Error::Execution(e.to_string()))?;
return Ok(ResultSet::with_rows(schema, result_rows));
}
let entities = execute_logical_plan(tx, input, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns.clone());
let rows: Vec<Vec<Value>> = scan.collect_values();
let group_by: Vec<LogicalExpr> = columns.iter().map(|c| LogicalExpr::column(c)).collect();
let input_op: Box<dyn Operator> = Box::new(ValuesOp::with_columns(columns, rows));
let mut agg_op = HashAggregateOp::new(
group_by,
vec![], None, input_op,
);
agg_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let schema = agg_op.schema();
let mut result_rows = Vec::new();
while let Some(row) = agg_op.next().map_err(|e| Error::Execution(e.to_string()))? {
result_rows.push(row);
}
agg_op.close().map_err(|e| Error::Execution(e.to_string()))?;
Ok(ResultSet::with_rows(schema, result_rows))
}
fn deduplicate_result_set(result: ResultSet, ctx: &ExecutionContext) -> Result<ResultSet> {
let schema = result.schema_arc();
let columns: Vec<String> = schema.columns().iter().map(|c| (*c).to_string()).collect();
let group_by: Vec<LogicalExpr> = columns.iter().map(|c| LogicalExpr::column(c)).collect();
let rows: Vec<Vec<Value>> = result
.into_rows()
.into_iter()
.map(|row| {
(0..row.schema().columns().len())
.map(|i| row.get(i).cloned().unwrap_or(Value::Null))
.collect()
})
.collect();
let input_op: Box<dyn Operator> = Box::new(ValuesOp::with_columns(columns, rows));
let mut agg_op = HashAggregateOp::new(
group_by,
vec![], None, input_op,
);
agg_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let output_schema = agg_op.schema();
let mut result_rows = Vec::new();
while let Some(row) = agg_op.next().map_err(|e| Error::Execution(e.to_string()))? {
result_rows.push(row);
}
agg_op.close().map_err(|e| Error::Execution(e.to_string()))?;
Ok(ResultSet::with_rows(output_schema, result_rows))
}
fn execute_union<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &UnionNode,
inputs: &[LogicalPlan],
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let input_ops: Vec<Box<dyn Operator>> =
inputs.iter().map(|input| plan_to_operator(tx, input, ctx)).collect::<Result<Vec<_>>>()?;
if input_ops.is_empty() {
return Ok(ResultSet::new(Arc::new(Schema::empty())));
}
let mut union_op = UnionOp::new(input_ops, node.all);
union_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let schema = union_op.schema();
let mut result_rows = Vec::new();
while let Some(row) = union_op.next().map_err(|e| Error::Execution(e.to_string()))? {
result_rows.push(row);
}
union_op.close().map_err(|e| Error::Execution(e.to_string()))?;
Ok(ResultSet::with_rows(schema, result_rows))
}
fn execute_set_op<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &SetOpNode,
left: &LogicalPlan,
right: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let left_op = plan_to_operator(tx, left, ctx)?;
let right_op = plan_to_operator(tx, right, ctx)?;
let mut set_op = SetOpOp::new(node.op_type, left_op, right_op);
set_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let schema = set_op.schema();
let mut result_rows = Vec::new();
while let Some(row) = set_op.next().map_err(|e| Error::Execution(e.to_string()))? {
result_rows.push(row);
}
set_op.close().map_err(|e| Error::Execution(e.to_string()))?;
Ok(ResultSet::with_rows(schema, result_rows))
}
fn execute_expand<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &ExpandNode,
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let input_result = execute_graph_plan(tx, input, ctx)?;
let source_nodes = graph_accessor::extract_source_nodes(input_result, &node.src_var);
graph_accessor::execute_expand_operation(tx, node, source_nodes)
}
fn execute_path_scan<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &PathScanNode,
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let mut current_result = execute_graph_plan(tx, input, ctx)?;
for step in &node.steps {
let source_nodes =
graph_accessor::extract_source_nodes(current_result, &step.expand.src_var);
current_result = graph_accessor::execute_expand_operation(tx, &step.expand, source_nodes)?;
}
Ok(current_result)
}
fn execute_ann_search<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &AnnSearchNode,
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let table_name = extract_table_name(input);
let index_name = if let Some(table) = table_name {
crate::vector::find_index_for_column(tx, &table, &node.vector_column).ok().flatten()
} else {
None
};
if let Some(idx_name) = index_name {
return execute_ann_search_with_index(tx, node, input, ctx, &idx_name);
}
execute_ann_search_brute_force(tx, node, input, ctx)
}
fn execute_ann_search_with_index<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &AnnSearchNode,
input: &LogicalPlan,
ctx: &ExecutionContext,
index_name: &str,
) -> Result<ResultSet> {
use manifoldb_vector::types::Embedding;
use std::collections::HashSet;
let query_value = evaluate_literal_expr(&node.query_vector, ctx)
.map_err(|_| Error::Execution("Could not evaluate query vector".to_string()))?;
let query_vec = match query_value {
Value::Vector(v) => v,
_ => {
return Err(Error::Execution("Query must be a vector".to_string()));
}
};
let query = Embedding::new(query_vec).map_err(|e| Error::Execution(e.to_string()))?;
let entities = execute_logical_plan(tx, input, ctx)?;
let mut columns = collect_all_columns(&entities);
let matching_ids: HashSet<manifoldb_core::EntityId> = entities.iter().map(|e| e.id).collect();
let entity_map: std::collections::HashMap<manifoldb_core::EntityId, &manifoldb_core::Entity> =
entities.iter().map(|e| (e.id, e)).collect();
let has_filter = node.filter.is_some() || !matching_ids.is_empty();
let search_result = if has_filter && !entity_map.is_empty() {
let predicate = |id: manifoldb_core::EntityId| {
if !matching_ids.contains(&id) {
return false;
}
if let Some(ref filter_expr) = node.filter {
if let Some(entity) = entity_map.get(&id) {
return evaluate_predicate(filter_expr, entity, ctx);
}
return false;
}
true
};
crate::vector::search_index_filtered(
tx,
index_name,
&query,
node.k,
predicate,
node.params.ef_search,
None,
)
.map_err(|e| Error::Execution(e.to_string()))?
} else {
crate::vector::search_index(tx, index_name, &query, node.k, node.params.ef_search)
.map_err(|e| Error::Execution(e.to_string()))?
};
if node.include_distance {
let distance_col = node.distance_alias.clone().unwrap_or_else(|| "distance".to_string());
columns.push(distance_col);
}
let schema = Arc::new(Schema::new(columns.clone()));
let mut result_rows = Vec::new();
let data_columns: Vec<&String> = if node.include_distance {
columns.iter().take(columns.len() - 1).collect()
} else {
columns.iter().collect()
};
for result in search_result {
if let Some(entity) = entity_map.get(&result.entity_id) {
let mut values: Vec<Value> = data_columns
.iter()
.map(|col| {
if *col == "_rowid" {
Value::Int(entity.id.as_u64() as i64)
} else {
entity.get_property(col).cloned().unwrap_or(Value::Null)
}
})
.collect();
if node.include_distance {
values.push(Value::Float(f64::from(result.distance)));
}
result_rows.push(Row::new(Arc::clone(&schema), values));
}
}
Ok(ResultSet::with_rows(schema, result_rows))
}
fn execute_ann_search_brute_force<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &AnnSearchNode,
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let entities = execute_logical_plan(tx, input, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns.clone());
let rows: Vec<Vec<Value>> = scan.collect_values();
let input_op: Box<dyn Operator> = Box::new(ValuesOp::with_columns(columns, rows));
let mut search_op = BruteForceSearchOp::new(
node.vector_column.clone(),
node.query_vector.clone(),
node.metric,
node.k,
node.include_distance,
node.distance_alias.clone(),
input_op,
);
search_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let schema = search_op.schema();
let mut result_rows = Vec::new();
while let Some(row) = search_op.next().map_err(|e| Error::Execution(e.to_string()))? {
result_rows.push(row);
}
search_op.close().map_err(|e| Error::Execution(e.to_string()))?;
Ok(ResultSet::with_rows(schema, result_rows))
}
fn extract_table_name(plan: &LogicalPlan) -> Option<String> {
match plan {
LogicalPlan::Scan(node) => Some(node.table_name.clone()),
LogicalPlan::Filter { input, .. }
| LogicalPlan::Project { input, .. }
| LogicalPlan::Sort { input, .. }
| LogicalPlan::Limit { input, .. }
| LogicalPlan::Alias { input, .. } => extract_table_name(input),
_ => None,
}
}
fn execute_vector_distance<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &VectorDistanceNode,
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let entities = execute_logical_plan(tx, input, ctx)?;
let mut columns = collect_all_columns(&entities);
let distance_col = node.alias.clone().unwrap_or_else(|| "distance".to_string());
columns.push(distance_col.clone());
let schema = Arc::new(Schema::new(columns.clone()));
let mut result_rows = Vec::new();
for entity in &entities {
let left_val = evaluate_expr(&node.left, entity, ctx);
let right_val = evaluate_expr(&node.right, entity, ctx);
let distance = compute_vector_distance(&left_val, &right_val, &node.metric);
let mut values: Vec<Value> = columns[..columns.len() - 1]
.iter()
.map(|col| {
if col == "_rowid" {
Value::Int(entity.id.as_u64() as i64)
} else {
entity.get_property(col).cloned().unwrap_or(Value::Null)
}
})
.collect();
values.push(distance);
result_rows.push(Row::new(Arc::clone(&schema), values));
}
Ok(ResultSet::with_rows(schema, result_rows))
}
fn compute_vector_distance(left: &Value, right: &Value, metric: &DistanceMetric) -> Value {
match (left, right) {
(Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => {
let dist = match metric {
DistanceMetric::Euclidean => {
a.iter().zip(b.iter()).map(|(x, y)| (x - y).powi(2)).sum::<f32>().sqrt()
}
DistanceMetric::Cosine => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
f32::MAX
} else {
1.0 - (dot / (norm_a * norm_b))
}
}
DistanceMetric::InnerProduct => {
-a.iter().zip(b.iter()).map(|(x, y)| x * y).sum::<f32>()
}
DistanceMetric::Manhattan => {
a.iter().zip(b.iter()).map(|(x, y)| (x - y).abs()).sum()
}
DistanceMetric::Hamming => {
a.iter().zip(b.iter()).filter(|(x, y)| (*x - *y).abs() > f32::EPSILON).count()
as f32
}
};
Value::Float(f64::from(dist))
}
_ => Value::Null,
}
}
fn execute_graph_projection<T: Transaction>(
tx: &DatabaseTransaction<T>,
exprs: &[LogicalExpr],
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let graph_result = execute_graph_plan(tx, input, ctx)?;
let has_wildcard = exprs.iter().any(|e| matches!(e, LogicalExpr::Wildcard));
if has_wildcard {
Ok(graph_result)
} else {
let projected_columns: Vec<String> = exprs.iter().map(|e| expr_to_column_name(e)).collect();
let new_schema = Arc::new(Schema::new(projected_columns.clone()));
let mut projected_rows = Vec::new();
for row in graph_result.rows() {
let values: Vec<Value> =
exprs.iter().map(|expr| evaluate_row_expr(expr, row)).collect();
projected_rows.push(Row::new(Arc::clone(&new_schema), values));
}
Ok(ResultSet::with_rows(new_schema, projected_rows))
}
}
fn execute_graph_plan<T: Transaction>(
tx: &DatabaseTransaction<T>,
plan: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
match plan {
LogicalPlan::Expand { node, input } => execute_expand(tx, node, input, ctx),
LogicalPlan::PathScan { node, input } => execute_path_scan(tx, node, input, ctx),
LogicalPlan::Filter { node, input } => {
let graph_result = execute_graph_plan(tx, input, ctx)?;
let schema = graph_result.schema_arc();
let filtered_rows: Vec<Row> = graph_result
.into_rows()
.into_iter()
.filter(|row| {
let result = evaluate_row_expr(&node.predicate, row);
matches!(result, Value::Bool(true))
})
.collect();
Ok(ResultSet::with_rows(schema, filtered_rows))
}
LogicalPlan::Sort { node, input } => {
let graph_result = execute_graph_plan(tx, input, ctx)?;
let schema = graph_result.schema_arc();
let mut rows = graph_result.into_rows();
if !node.order_by.is_empty() {
rows.sort_by(|a, b| {
for order in &node.order_by {
let va = evaluate_row_expr(&order.expr, a);
let vb = evaluate_row_expr(&order.expr, b);
let cmp = compare_values(&va, &vb);
let cmp = if order.ascending { cmp } else { cmp.reverse() };
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
std::cmp::Ordering::Equal
});
}
Ok(ResultSet::with_rows(schema, rows))
}
LogicalPlan::Limit { node, input } => {
let graph_result = execute_graph_plan(tx, input, ctx)?;
let schema = graph_result.schema_arc();
let rows = graph_result.into_rows();
let start = node.offset.unwrap_or(0);
let end = node.limit.map(|l| start + l).unwrap_or(rows.len());
let limited_rows: Vec<Row> = rows.into_iter().skip(start).take(end - start).collect();
Ok(ResultSet::with_rows(schema, limited_rows))
}
LogicalPlan::Alias { input, .. } => {
execute_graph_plan(tx, input, ctx)
}
LogicalPlan::Scan(scan_node) => {
let label = &scan_node.table_name;
let alias = scan_node.alias.as_deref().unwrap_or(label);
let entities = tx.iter_entities(Some(label)).map_err(Error::Transaction)?;
let columns = collect_all_columns(&entities);
let prefixed_columns: Vec<String> =
columns.iter().map(|c| format!("{}.{}", alias, c)).collect();
let schema = Arc::new(Schema::new(prefixed_columns.clone()));
let rows: Vec<Row> = entities
.iter()
.map(|entity| {
let values: Vec<Value> = columns
.iter()
.map(|col| {
if col == "_rowid" {
Value::Int(entity.id.as_u64() as i64)
} else {
entity.get_property(col).cloned().unwrap_or(Value::Null)
}
})
.collect();
Row::new(Arc::clone(&schema), values)
})
.collect();
Ok(ResultSet::with_rows(schema, rows))
}
_ => {
let entities = execute_logical_plan(tx, plan, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns);
let schema = scan.schema();
let rows = scan.collect_rows();
Ok(ResultSet::with_rows(schema, rows))
}
}
}
fn plan_to_operator<T: Transaction>(
tx: &DatabaseTransaction<T>,
plan: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<Box<dyn Operator>> {
if contains_join(plan) {
let join_result = execute_join_plan(tx, plan, ctx)?;
let schema = join_result.schema_arc();
let columns: Vec<String> = schema.columns().iter().map(|c| (*c).to_string()).collect();
let rows: Vec<Vec<Value>> = join_result
.into_rows()
.into_iter()
.map(|row| {
(0..row.schema().columns().len())
.map(|i| row.get(i).cloned().unwrap_or(Value::Null))
.collect()
})
.collect();
return Ok(Box::new(ValuesOp::with_columns(columns, rows)));
}
if let LogicalPlan::Project { node, input } = plan {
let has_wildcard = node.exprs.iter().any(|e| matches!(e, LogicalExpr::Wildcard));
let entities = execute_logical_plan(tx, input, ctx)?;
let (columns, rows): (Vec<String>, Vec<Vec<Value>>) = if has_wildcard {
let cols = collect_all_columns(&entities);
let scan = StorageScan::new(entities, cols.clone());
(cols, scan.collect_values())
} else {
let cols: Vec<String> = node.exprs.iter().map(|e| expr_to_column_name(e)).collect();
let row_vals: Vec<Vec<Value>> = entities
.iter()
.map(|entity| {
node.exprs.iter().map(|expr| evaluate_expr(expr, entity, ctx)).collect()
})
.collect();
(cols, row_vals)
};
return Ok(Box::new(ValuesOp::with_columns(columns, rows)));
}
let entities = execute_logical_plan(tx, plan, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns.clone());
let rows: Vec<Vec<Value>> = scan.collect_values();
Ok(Box::new(ValuesOp::with_columns(columns, rows)))
}
fn evaluate_expr_on_row(
expr: &LogicalExpr,
row: &Row,
schema: &Arc<Schema>,
ctx: &ExecutionContext,
) -> Value {
match expr {
LogicalExpr::Literal(lit) => literal_to_value(lit),
LogicalExpr::Column { name, .. } => {
if let Some(idx) = schema.index_of(name) {
row.get(idx).cloned().unwrap_or(Value::Null)
} else {
Value::Null
}
}
LogicalExpr::Parameter(idx) => {
ctx.get_parameter(*idx as u32).cloned().unwrap_or(Value::Null)
}
LogicalExpr::Alias { expr, .. } => evaluate_expr_on_row(expr, row, schema, ctx),
LogicalExpr::AggregateFunction { func, .. } => {
let name = format!("{func}");
if let Some(idx) = schema.index_of(&name) {
row.get(idx).cloned().unwrap_or(Value::Null)
} else {
Value::Null
}
}
LogicalExpr::Wildcard => Value::Null,
_ => Value::Null,
}
}
fn collect_all_columns(entities: &[Entity]) -> Vec<String> {
if entities.is_empty() {
return vec![];
}
let mut cols: Vec<String> = vec!["_rowid".to_string()];
for entity in entities {
for key in entity.properties.keys() {
if !cols.contains(key) {
cols.push(key.clone());
}
}
}
cols
}
fn execute_logical_plan<T: Transaction>(
tx: &DatabaseTransaction<T>,
plan: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<Vec<Entity>> {
match plan {
LogicalPlan::Scan(scan_node) => {
let label = &scan_node.table_name;
let entities = tx.iter_entities(Some(label)).map_err(Error::Transaction)?;
Ok(entities)
}
LogicalPlan::Filter { node, input } => {
let entities = execute_logical_plan(tx, input, ctx)?;
let filtered: Vec<Entity> = entities
.into_iter()
.filter(|entity| evaluate_predicate(&node.predicate, entity, ctx))
.collect();
Ok(filtered)
}
LogicalPlan::Project { input, .. } => {
execute_logical_plan(tx, input, ctx)
}
LogicalPlan::Limit { node, input } => {
let entities = execute_logical_plan(tx, input, ctx)?;
let start = node.offset.unwrap_or(0);
let end = node.limit.map(|l| start + l).unwrap_or(entities.len());
Ok(entities.into_iter().skip(start).take(end - start).collect())
}
LogicalPlan::Sort { node, input } => {
let mut entities = execute_logical_plan(tx, input, ctx)?;
if !node.order_by.is_empty() {
entities.sort_by(|a, b| {
for order in &node.order_by {
let va = evaluate_expr(&order.expr, a, ctx);
let vb = evaluate_expr(&order.expr, b, ctx);
let cmp = compare_values(&va, &vb);
let cmp = if order.ascending { cmp } else { cmp.reverse() };
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
std::cmp::Ordering::Equal
});
}
Ok(entities)
}
LogicalPlan::Values(_) => {
Ok(Vec::new())
}
LogicalPlan::Empty { .. } => Ok(Vec::new()),
LogicalPlan::Aggregate { input, .. } => {
execute_logical_plan(tx, input, ctx)
}
LogicalPlan::Distinct { input, .. } => {
execute_logical_plan(tx, input, ctx)
}
LogicalPlan::Alias { input, .. } => {
execute_logical_plan(tx, input, ctx)
}
LogicalPlan::Join { .. } => {
Err(Error::Execution(
"JOIN queries should be executed through execute_physical_plan, not execute_logical_plan"
.to_string(),
))
}
LogicalPlan::SetOp { .. } => {
Err(Error::Execution(
"Set operations should be executed through execute_physical_plan, not execute_logical_plan"
.to_string(),
))
}
LogicalPlan::Union { .. } => {
Err(Error::Execution(
"UNION queries should be executed through execute_physical_plan, not execute_logical_plan"
.to_string(),
))
}
LogicalPlan::Expand { .. } => Err(Error::Execution(
"Graph EXPAND queries not yet supported in entity execution".to_string(),
)),
LogicalPlan::PathScan { .. } => Err(Error::Execution(
"Graph path scan queries not yet supported in entity execution".to_string(),
)),
LogicalPlan::AnnSearch { input, .. } => {
execute_logical_plan(tx, input, ctx)
}
LogicalPlan::VectorDistance { input, .. } => {
execute_logical_plan(tx, input, ctx)
}
LogicalPlan::Insert { .. } | LogicalPlan::Update { .. } | LogicalPlan::Delete { .. } => {
Err(Error::Execution(
"DML statements should be executed via execute_statement, not execute_logical_plan"
.to_string(),
))
}
LogicalPlan::CreateTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CreateIndex(_)
| LogicalPlan::DropIndex(_)
| LogicalPlan::CreateCollection(_)
| LogicalPlan::DropCollection(_) => Err(Error::Execution(
"DDL statements should be executed via execute_statement, not execute_logical_plan"
.to_string(),
)),
LogicalPlan::HybridSearch { input, .. } => {
execute_logical_plan(tx, input, ctx)
}
}
}
fn execute_insert<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
table: &str,
columns: &[String],
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<u64> {
use crate::collection::{CollectionManager, CollectionName};
use manifoldb_vector::types::VectorData;
let mut count = 0;
let collection = CollectionName::new(table)
.ok()
.and_then(|name| CollectionManager::get(tx, &name).ok().flatten());
let resolved_columns: Vec<String> = if columns.is_empty() {
match SchemaManager::get_table(tx, table) {
Ok(Some(schema)) => schema.columns.iter().map(|c| c.name.clone()).collect(),
Ok(None) => {
Vec::new()
}
Err(e) => {
return Err(Error::Execution(format!("Failed to get table schema: {e}")));
}
}
} else {
columns.to_vec()
};
if let LogicalPlan::Values(values_node) = input {
for row_exprs in &values_node.rows {
let mut entity = tx.create_entity().map_err(Error::Transaction)?;
entity = entity.with_label(table);
let mut vectors_to_store: Vec<(String, VectorData)> = Vec::new();
for (i, col) in resolved_columns.iter().enumerate() {
if let Some(expr) = row_exprs.get(i) {
let value = evaluate_literal_expr(expr, ctx)?;
if let Some(ref coll) = collection {
if coll.has_vector(col) {
if let Some(vector_data) = value_to_vector_data(&value) {
vectors_to_store.push((col.clone(), vector_data));
}
continue;
}
}
entity = entity.with_property(col, value);
}
}
tx.put_entity(&entity).map_err(Error::Transaction)?;
super::index_maintenance::EntityIndexMaintenance::on_insert(tx, &entity)
.map_err(|e| Error::Execution(format!("property index update failed: {e}")))?;
if let Some(ref coll) = collection {
if let Some(provider) = ctx.collection_vector_provider() {
for (vector_name, vector_data) in vectors_to_store {
provider
.upsert_vector(coll.id(), entity.id, table, &vector_name, &vector_data)
.map_err(|e| Error::Execution(format!("vector storage failed: {e}")))?;
}
}
} else {
crate::vector::update_entity_in_indexes(tx, &entity, None)
.map_err(|e| Error::Execution(format!("vector index update failed: {e}")))?;
}
count += 1;
}
}
Ok(count)
}
fn value_to_vector_data(value: &Value) -> Option<manifoldb_vector::types::VectorData> {
use manifoldb_vector::types::VectorData;
match value {
Value::Vector(v) => Some(VectorData::Dense(v.clone())),
Value::SparseVector(v) => Some(VectorData::Sparse(v.clone())),
Value::MultiVector(v) => Some(VectorData::Multi(v.clone())),
_ => None,
}
}
fn execute_update<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
table: &str,
assignments: &[(String, LogicalExpr)],
filter: &Option<LogicalExpr>,
ctx: &ExecutionContext,
) -> Result<u64> {
use crate::collection::{CollectionManager, CollectionName};
let collection = CollectionName::new(table)
.ok()
.and_then(|name| CollectionManager::get(tx, &name).ok().flatten());
let entities = tx.iter_entities(Some(table)).map_err(Error::Transaction)?;
let mut count = 0;
for entity in entities {
let matches = match filter {
Some(pred) => evaluate_predicate(pred, &entity, ctx),
None => true,
};
if matches {
let old_entity = entity.clone();
let mut updated_entity = entity;
let mut vectors_to_update: Vec<(String, manifoldb_vector::types::VectorData)> =
Vec::new();
for (col, expr) in assignments {
let value = evaluate_expr(expr, &updated_entity, ctx);
if let Some(ref coll) = collection {
if coll.has_vector(col) {
if let Some(vector_data) = value_to_vector_data(&value) {
vectors_to_update.push((col.clone(), vector_data));
}
updated_entity.properties.remove(col);
continue;
}
}
updated_entity.set_property(col, value);
}
tx.put_entity(&updated_entity).map_err(Error::Transaction)?;
super::index_maintenance::EntityIndexMaintenance::on_update(
tx,
&old_entity,
&updated_entity,
)
.map_err(|e| Error::Execution(format!("property index update failed: {e}")))?;
if let Some(ref coll) = collection {
if let Some(provider) = ctx.collection_vector_provider() {
for (vector_name, vector_data) in vectors_to_update {
provider
.upsert_vector(
coll.id(),
updated_entity.id,
table,
&vector_name,
&vector_data,
)
.map_err(|e| Error::Execution(format!("vector storage failed: {e}")))?;
}
}
} else {
crate::vector::update_entity_in_indexes(tx, &updated_entity, Some(&old_entity))
.map_err(|e| Error::Execution(format!("vector index update failed: {e}")))?;
}
count += 1;
}
}
Ok(count)
}
fn execute_delete<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
table: &str,
filter: &Option<LogicalExpr>,
ctx: &ExecutionContext,
) -> Result<u64> {
use crate::collection::{CollectionManager, CollectionName};
let collection = CollectionName::new(table)
.ok()
.and_then(|name| CollectionManager::get(tx, &name).ok().flatten());
let entities = tx.iter_entities(Some(table)).map_err(Error::Transaction)?;
let mut count = 0;
for entity in entities {
let matches = match filter {
Some(pred) => evaluate_predicate(pred, &entity, ctx),
None => true,
};
if matches {
super::index_maintenance::EntityIndexMaintenance::on_delete(tx, &entity)
.map_err(|e| Error::Execution(format!("property index removal failed: {e}")))?;
if let Some(ref coll) = collection {
if let Some(provider) = ctx.collection_vector_provider() {
provider
.delete_entity_vectors(coll.id(), entity.id, table)
.map_err(|e| Error::Execution(format!("vector deletion failed: {e}")))?;
}
} else {
crate::vector::remove_entity_from_indexes(tx, &entity)
.map_err(|e| Error::Execution(format!("vector index removal failed: {e}")))?;
}
tx.delete_entity(entity.id).map_err(Error::Transaction)?;
count += 1;
}
}
Ok(count)
}
fn contains_join(plan: &LogicalPlan) -> bool {
match plan {
LogicalPlan::Join { .. } => true,
LogicalPlan::Filter { input, .. }
| LogicalPlan::Project { input, .. }
| LogicalPlan::Sort { input, .. }
| LogicalPlan::Limit { input, .. }
| LogicalPlan::Distinct { input, .. }
| LogicalPlan::Alias { input, .. }
| LogicalPlan::Aggregate { input, .. } => contains_join(input),
_ => false,
}
}
fn contains_graph(plan: &LogicalPlan) -> bool {
match plan {
LogicalPlan::Expand { .. } | LogicalPlan::PathScan { .. } => true,
LogicalPlan::Filter { input, .. }
| LogicalPlan::Project { input, .. }
| LogicalPlan::Sort { input, .. }
| LogicalPlan::Limit { input, .. }
| LogicalPlan::Distinct { input, .. }
| LogicalPlan::Alias { input, .. }
| LogicalPlan::Aggregate { input, .. } => contains_graph(input),
_ => false,
}
}
fn execute_join<T: Transaction>(
tx: &DatabaseTransaction<T>,
node: &manifoldb_query::plan::logical::JoinNode,
left: &LogicalPlan,
right: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let (left_entities, left_alias) = execute_join_input(tx, left, ctx)?;
let (right_entities, right_alias) = execute_join_input(tx, right, ctx)?;
let (left_op, left_cols) = entities_to_values_op(&left_entities, &left_alias);
let (right_op, right_cols) = entities_to_values_op(&right_entities, &right_alias);
let mut join_op = create_join_operator(
node.join_type,
node.condition.clone(),
&node.using_columns,
Box::new(left_op),
Box::new(right_op),
&left_cols,
&right_cols,
);
join_op.open(ctx).map_err(|e| Error::Execution(e.to_string()))?;
let mut rows = Vec::new();
while let Some(row) = join_op.next().map_err(|e| Error::Execution(e.to_string()))? {
rows.push(row);
}
join_op.close().map_err(|e| Error::Execution(e.to_string()))?;
let schema = join_op.schema();
Ok(ResultSet::with_rows(schema, rows))
}
fn execute_join_projection<T: Transaction>(
tx: &DatabaseTransaction<T>,
exprs: &[LogicalExpr],
input: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
let join_result = execute_join_plan(tx, input, ctx)?;
let has_wildcard = exprs.iter().any(|e| matches!(e, LogicalExpr::Wildcard));
if has_wildcard {
Ok(join_result)
} else {
let projected_columns: Vec<String> = exprs.iter().map(|e| expr_to_column_name(e)).collect();
let new_schema = Arc::new(Schema::new(projected_columns.clone()));
let mut projected_rows = Vec::new();
for row in join_result.rows() {
let values: Vec<Value> =
exprs.iter().map(|expr| evaluate_row_expr(expr, row)).collect();
projected_rows.push(Row::new(Arc::clone(&new_schema), values));
}
Ok(ResultSet::with_rows(new_schema, projected_rows))
}
}
fn execute_join_plan<T: Transaction>(
tx: &DatabaseTransaction<T>,
plan: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<ResultSet> {
match plan {
LogicalPlan::Join { node, left, right } => execute_join(tx, node, left, right, ctx),
LogicalPlan::Filter { node, input } => {
let join_result = execute_join_plan(tx, input, ctx)?;
let schema = join_result.schema_arc();
let filtered_rows: Vec<Row> = join_result
.into_rows()
.into_iter()
.filter(|row| {
let result = evaluate_row_expr(&node.predicate, row);
matches!(result, Value::Bool(true))
})
.collect();
Ok(ResultSet::with_rows(schema, filtered_rows))
}
LogicalPlan::Sort { node, input } => {
let join_result = execute_join_plan(tx, input, ctx)?;
let schema = join_result.schema_arc();
let mut rows = join_result.into_rows();
if !node.order_by.is_empty() {
rows.sort_by(|a, b| {
for order in &node.order_by {
let va = evaluate_row_expr(&order.expr, a);
let vb = evaluate_row_expr(&order.expr, b);
let cmp = compare_values(&va, &vb);
let cmp = if order.ascending { cmp } else { cmp.reverse() };
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
std::cmp::Ordering::Equal
});
}
Ok(ResultSet::with_rows(schema, rows))
}
LogicalPlan::Limit { node, input } => {
let join_result = execute_join_plan(tx, input, ctx)?;
let schema = join_result.schema_arc();
let rows = join_result.into_rows();
let start = node.offset.unwrap_or(0);
let end = node.limit.map(|l| start + l).unwrap_or(rows.len());
let limited_rows: Vec<Row> = rows.into_iter().skip(start).take(end - start).collect();
Ok(ResultSet::with_rows(schema, limited_rows))
}
LogicalPlan::Alias { input, .. } => {
execute_join_plan(tx, input, ctx)
}
_ => {
let entities = execute_logical_plan(tx, plan, ctx)?;
let columns = collect_all_columns(&entities);
let scan = StorageScan::new(entities, columns);
let schema = scan.schema();
let rows = scan.collect_rows();
Ok(ResultSet::with_rows(schema, rows))
}
}
}
fn execute_join_input<T: Transaction>(
tx: &DatabaseTransaction<T>,
plan: &LogicalPlan,
ctx: &ExecutionContext,
) -> Result<(Vec<Entity>, String)> {
match plan {
LogicalPlan::Scan(scan_node) => {
let label = &scan_node.table_name;
let alias = scan_node.alias.as_deref().unwrap_or(label);
let entities = tx.iter_entities(Some(label)).map_err(Error::Transaction)?;
Ok((entities, alias.to_string()))
}
LogicalPlan::Alias { alias, input } => {
let (entities, _) = execute_join_input(tx, input, ctx)?;
Ok((entities, alias.clone()))
}
LogicalPlan::Filter { node, input } => {
let (entities, alias) = execute_join_input(tx, input, ctx)?;
let filtered: Vec<Entity> = entities
.into_iter()
.filter(|entity| evaluate_predicate(&node.predicate, entity, ctx))
.collect();
Ok((filtered, alias))
}
LogicalPlan::Join { node, left, right } => {
let result = execute_join(tx, node, left, right, ctx)?;
let entities: Vec<Entity> = result
.rows()
.iter()
.enumerate()
.map(|(i, row)| row_to_entity(row, i as u64))
.collect();
Ok((entities, "joined".to_string()))
}
_ => {
let entities = execute_logical_plan(tx, plan, ctx)?;
Ok((entities, "table".to_string()))
}
}
}
fn entities_to_values_op(entities: &[Entity], prefix: &str) -> (ValuesOp, Vec<String>) {
let mut prop_names: Vec<String> = vec!["_rowid".to_string()];
for entity in entities {
for key in entity.properties.keys() {
if !prop_names.contains(key) {
prop_names.push(key.clone());
}
}
}
let prefixed_columns: Vec<String> =
prop_names.iter().map(|n| format!("{}.{}", prefix, n)).collect();
let rows: Vec<Vec<Value>> = entities
.iter()
.map(|entity| {
prop_names
.iter()
.map(|prop| {
if prop == "_rowid" {
Value::Int(entity.id.as_u64() as i64)
} else {
entity.get_property(prop).cloned().unwrap_or(Value::Null)
}
})
.collect()
})
.collect();
(ValuesOp::with_columns(prefixed_columns.clone(), rows), prefixed_columns)
}
fn create_join_operator(
join_type: JoinType,
condition: Option<LogicalExpr>,
using_columns: &[String],
left: Box<dyn Operator>,
right: Box<dyn Operator>,
left_cols: &[String],
right_cols: &[String],
) -> Box<dyn Operator> {
let join_condition = if using_columns.is_empty() {
condition
} else {
let conditions: Vec<LogicalExpr> = using_columns
.iter()
.filter_map(|col| {
let left_col = left_cols.iter().find(|c| c.ends_with(&format!(".{}", col)));
let right_col = right_cols.iter().find(|c| c.ends_with(&format!(".{}", col)));
match (left_col, right_col) {
(Some(l), Some(r)) => Some(LogicalExpr::column(l).eq(LogicalExpr::column(r))),
_ => None,
}
})
.collect();
if conditions.is_empty() {
condition
} else {
let mut combined = conditions.into_iter();
let first = combined.next();
first.map(|f| combined.fold(f, |acc, c| acc.and(c)))
}
};
if let Some(ref cond) = join_condition {
if let Some((left_keys, right_keys)) = extract_equijoin_keys(cond, left_cols, right_cols) {
return match join_type {
JoinType::Left => {
Box::new(HashJoinOp::new(join_type, right_keys, left_keys, None, right, left))
}
_ => Box::new(HashJoinOp::new(join_type, left_keys, right_keys, None, left, right)),
};
}
}
Box::new(NestedLoopJoinOp::new(join_type, join_condition, left, right))
}
fn extract_equijoin_keys(
condition: &LogicalExpr,
left_cols: &[String],
right_cols: &[String],
) -> Option<(Vec<LogicalExpr>, Vec<LogicalExpr>)> {
match condition {
LogicalExpr::BinaryOp { left, op, right } => {
use manifoldb_query::ast::BinaryOp;
match op {
BinaryOp::Eq => {
let left_col = extract_column_name(left);
let right_col = extract_column_name(right);
if let (Some(l), Some(r)) = (left_col, right_col) {
let l_is_left =
left_cols.iter().any(|c| c == l || c.ends_with(&format!(".{}", l)));
let r_is_right =
right_cols.iter().any(|c| c == r || c.ends_with(&format!(".{}", r)));
if l_is_left && r_is_right {
return Some((vec![*left.clone()], vec![*right.clone()]));
}
let l_is_right =
right_cols.iter().any(|c| c == l || c.ends_with(&format!(".{}", l)));
let r_is_left =
left_cols.iter().any(|c| c == r || c.ends_with(&format!(".{}", r)));
if l_is_right && r_is_left {
return Some((vec![*right.clone()], vec![*left.clone()]));
}
}
None
}
BinaryOp::And => {
let left_keys = extract_equijoin_keys(left, left_cols, right_cols);
let right_keys = extract_equijoin_keys(right, left_cols, right_cols);
match (left_keys, right_keys) {
(Some((mut lk1, mut rk1)), Some((lk2, rk2))) => {
lk1.extend(lk2);
rk1.extend(rk2);
Some((lk1, rk1))
}
(Some(keys), None) | (None, Some(keys)) => Some(keys),
(None, None) => None,
}
}
_ => None,
}
}
_ => None,
}
}
fn extract_column_name(expr: &LogicalExpr) -> Option<&str> {
match expr {
LogicalExpr::Column { name, .. } => Some(name.as_str()),
LogicalExpr::Alias { expr, .. } => extract_column_name(expr),
_ => None,
}
}
fn row_to_entity(row: &Row, id: u64) -> Entity {
let mut entity = Entity::new(manifoldb_core::EntityId::new(id));
for (i, col) in row.schema().columns().iter().enumerate() {
if let Some(value) = row.get(i) {
entity.set_property(*col, value.clone());
}
}
entity
}
fn evaluate_row_expr(expr: &LogicalExpr, row: &Row) -> Value {
use manifoldb_query::exec::operators::filter::evaluate_expr as op_evaluate_expr;
op_evaluate_expr(expr, row).unwrap_or(Value::Null)
}
fn execute_create_table<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &CreateTableNode,
) -> Result<u64> {
SchemaManager::create_table(tx, node).map_err(|e| Error::Execution(e.to_string()))?;
Ok(0) }
fn execute_drop_table<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &DropTableNode,
) -> Result<u64> {
for table_name in &node.names {
SchemaManager::drop_table(tx, table_name, node.if_exists)
.map_err(|e| Error::Execution(e.to_string()))?;
let entities = tx.iter_entities(Some(table_name)).map_err(Error::Transaction)?;
for entity in entities {
tx.delete_entity(entity.id).map_err(Error::Transaction)?;
}
}
Ok(0)
}
fn execute_create_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &CreateIndexNode,
) -> Result<u64> {
SchemaManager::create_index(tx, node).map_err(|e| Error::Execution(e.to_string()))?;
if let Some(using) = &node.using {
let using_lower = using.to_lowercase();
if using_lower == "hnsw" {
build_hnsw_index(tx, node)?;
return Ok(0);
}
}
let backfilled = backfill_btree_index(tx, node)?;
Ok(backfilled)
}
fn backfill_btree_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &CreateIndexNode,
) -> Result<u64> {
use manifoldb_core::index::{IndexId, PropertyIndexEntry};
let column_names: Vec<String> =
node.columns.iter().map(|c| extract_column_name_from_expr(&c.expr)).collect();
if column_names.is_empty() {
return Ok(0);
}
if column_names.len() > 1 {
return Ok(0);
}
let column_name = &column_names[0];
let table_name = &node.table;
let index_id = IndexId::from_label_property(table_name, column_name);
let entities =
tx.iter_entities(Some(table_name)).map_err(|e| Error::Execution(e.to_string()))?;
let mut count = 0u64;
for entity in &entities {
if let Some(value) = entity.properties.get(column_name) {
if PropertyIndexEntry::is_indexable(value) {
let entry = PropertyIndexEntry::new(index_id, value.clone(), entity.id);
if let Some(key) = entry.encode_key() {
tx.put_property_index(&key).map_err(|e| Error::Execution(e.to_string()))?;
count += 1;
}
}
}
}
Ok(count)
}
fn build_hnsw_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &CreateIndexNode,
) -> Result<()> {
use crate::vector::HnswIndexBuilder;
use manifoldb_vector::distance::DistanceMetric;
let column_name =
node.columns.first().map(|c| extract_column_name_from_expr(&c.expr)).ok_or_else(|| {
Error::Execution("HNSW index requires exactly one column".to_string())
})?;
let mut builder = HnswIndexBuilder::new(&node.name, &node.table, column_name);
for (key, value) in &node.with {
let key_lower = key.to_lowercase();
match key_lower.as_str() {
"m" => {
if let Ok(m) = value.parse::<usize>() {
builder = builder.m(m);
}
}
"ef_construction" => {
if let Ok(ef) = value.parse::<usize>() {
builder = builder.ef_construction(ef);
}
}
"ef_search" => {
if let Ok(ef) = value.parse::<usize>() {
builder = builder.ef_search(ef);
}
}
"dimension" | "dimensions" => {
if let Ok(dim) = value.parse::<usize>() {
builder = builder.dimension(dim);
}
}
"distance" | "metric" | "distance_metric" => {
let metric = match value.to_lowercase().as_str() {
"euclidean" | "l2" => DistanceMetric::Euclidean,
"cosine" => DistanceMetric::Cosine,
"dot" | "inner_product" | "ip" => DistanceMetric::DotProduct,
_ => DistanceMetric::Cosine, };
builder = builder.distance_metric(metric);
}
_ => {
}
}
}
builder.build(tx).map_err(|e| Error::Execution(e.to_string()))
}
fn extract_column_name_from_expr(expr: &manifoldb_query::ast::Expr) -> String {
match expr {
manifoldb_query::ast::Expr::Column(qn) => {
qn.parts.last().map(|p| p.name.clone()).unwrap_or_default()
}
_ => format!("{expr:?}"),
}
}
fn execute_drop_index<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &DropIndexNode,
) -> Result<u64> {
let mut total_deleted = 0u64;
for index_name in &node.names {
let schema = SchemaManager::get_index(tx, index_name)
.map_err(|e| Error::Execution(e.to_string()))?;
crate::vector::drop_index(tx, index_name, true)
.map_err(|e| Error::Execution(format!("failed to drop vector index: {e}")))?;
if let Some(schema) = schema {
let is_hnsw = schema.using.as_ref().is_some_and(|u| u.eq_ignore_ascii_case("hnsw"));
if !is_hnsw {
let deleted = cleanup_btree_index_entries(tx, &schema)?;
total_deleted += deleted;
}
}
SchemaManager::drop_index(tx, index_name, node.if_exists)
.map_err(|e| Error::Execution(e.to_string()))?;
}
Ok(total_deleted)
}
fn cleanup_btree_index_entries<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
schema: &crate::schema::IndexSchema,
) -> Result<u64> {
use manifoldb_core::index::{IndexId, PropertyIndexScan};
if schema.columns.len() != 1 {
return Ok(0);
}
let column_name = &schema.columns[0].expr;
let table_name = &schema.table;
let index_id = IndexId::from_label_property(table_name, column_name);
let (start, end) = PropertyIndexScan::full_index_range(index_id);
let deleted = tx
.delete_property_index_range(&start, &end)
.map_err(|e| Error::Execution(e.to_string()))?;
Ok(deleted as u64)
}
fn execute_create_collection<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &CreateCollectionNode,
) -> Result<u64> {
use crate::collection::{
CollectionManager, CollectionName, DistanceType, HnswParams, IndexConfig,
InvertedIndexParams, VectorConfig, VectorType,
};
use manifoldb_vector::distance::DistanceMetric;
let mut vector_configs = Vec::new();
for vec_def in &node.vectors {
let vector_type = match &vec_def.vector_type {
manifoldb_query::ast::VectorTypeDef::Vector { dimension } => {
VectorType::Dense { dimension: *dimension as usize }
}
manifoldb_query::ast::VectorTypeDef::SparseVector { max_dimension } => {
VectorType::Sparse { max_dimension: max_dimension.unwrap_or(0) }
}
manifoldb_query::ast::VectorTypeDef::MultiVector { token_dim } => {
VectorType::Multi { token_dim: *token_dim as usize }
}
manifoldb_query::ast::VectorTypeDef::BinaryVector { bits } => {
VectorType::Binary { bits: *bits as usize }
}
};
let mut distance_metric = DistanceMetric::Cosine;
let mut m: Option<usize> = None;
let mut ef_construction: Option<usize> = None;
for (key, value) in &vec_def.with_options {
match key.as_str() {
"distance" => {
distance_metric = match value.to_lowercase().as_str() {
"euclidean" | "l2" => DistanceMetric::Euclidean,
"cosine" => DistanceMetric::Cosine,
"dot" | "dot_product" | "inner_product" => DistanceMetric::DotProduct,
_ => DistanceMetric::Cosine,
};
}
"m" => {
m = value.parse().ok();
}
"ef_construction" => {
ef_construction = value.parse().ok();
}
_ => {}
}
}
let index_config = match vec_def.using.as_ref().map(|u| u.to_lowercase()).as_deref() {
Some("hnsw") => {
let mut hnsw_params = HnswParams::default();
if let Some(m_val) = m {
hnsw_params = HnswParams::new(m_val);
}
if let Some(ef) = ef_construction {
hnsw_params = hnsw_params.with_ef_construction(ef);
}
IndexConfig::hnsw(hnsw_params)
}
Some("inverted") => IndexConfig::inverted(InvertedIndexParams::default()),
Some("flat") | None => IndexConfig::flat(),
Some(_) => IndexConfig::flat(),
};
let config = VectorConfig {
vector_type,
distance: DistanceType::Dense(distance_metric),
index: index_config,
};
vector_configs.push((vec_def.name.name.clone(), config));
}
let collection_name = CollectionName::new(&node.name)
.map_err(|e| Error::Execution(format!("invalid collection name: {e}")))?;
if node.if_not_exists && CollectionManager::exists(tx, &collection_name).unwrap_or(false) {
return Ok(0);
}
CollectionManager::create(tx, &collection_name, vector_configs)
.map_err(|e| Error::Execution(e.to_string()))?;
Ok(0)
}
fn execute_drop_collection<T: Transaction>(
tx: &mut DatabaseTransaction<T>,
node: &DropCollectionNode,
) -> Result<u64> {
use crate::collection::{CollectionManager, CollectionName};
for name in &node.names {
let collection_name = CollectionName::new(name)
.map_err(|e| Error::Execution(format!("invalid collection name: {e}")))?;
CollectionManager::delete(tx, &collection_name, node.if_exists)
.map_err(|e| Error::Execution(e.to_string()))?;
}
Ok(0)
}
fn evaluate_expr(expr: &LogicalExpr, entity: &Entity, ctx: &ExecutionContext) -> Value {
match expr {
LogicalExpr::Literal(lit) => literal_to_value(lit),
LogicalExpr::Column { name, .. } => {
if name == "_rowid" {
Value::Int(entity.id.as_u64() as i64)
} else {
entity.get_property(name).cloned().unwrap_or(Value::Null)
}
}
LogicalExpr::Parameter(idx) => {
ctx.get_parameter(*idx as u32).cloned().unwrap_or(Value::Null)
}
LogicalExpr::Alias { expr, .. } => evaluate_expr(expr, entity, ctx),
LogicalExpr::BinaryOp { left, op, right } => {
let lval = evaluate_expr(left, entity, ctx);
let rval = evaluate_expr(right, entity, ctx);
evaluate_binary_op(op, &lval, &rval)
}
LogicalExpr::Wildcard => Value::Null,
_ => Value::Null,
}
}
fn evaluate_literal_expr(expr: &LogicalExpr, ctx: &ExecutionContext) -> Result<Value> {
match expr {
LogicalExpr::Literal(lit) => Ok(literal_to_value(lit)),
LogicalExpr::Parameter(idx) => ctx
.get_parameter(*idx as u32)
.cloned()
.ok_or_else(|| Error::Execution(format!("missing parameter at index {}", idx))),
other => Err(Error::Execution(format!(
"unsupported expression type in VALUES clause: {:?}",
std::mem::discriminant(other)
))),
}
}
fn literal_to_value(lit: &Literal) -> Value {
match lit {
Literal::Null => Value::Null,
Literal::Boolean(b) => Value::Bool(*b),
Literal::Integer(n) => Value::Int(*n),
Literal::Float(f) => Value::Float(*f),
Literal::String(s) => Value::String(s.clone()),
Literal::Vector(v) => Value::Vector(v.clone()),
Literal::MultiVector(v) => Value::MultiVector(v.clone()),
}
}
fn evaluate_predicate(expr: &LogicalExpr, entity: &Entity, ctx: &ExecutionContext) -> bool {
match expr {
LogicalExpr::Literal(Literal::Boolean(b)) => *b,
LogicalExpr::BinaryOp { left, op, right } => {
let lval = evaluate_expr(left, entity, ctx);
let rval = evaluate_expr(right, entity, ctx);
use manifoldb_query::ast::BinaryOp;
match op {
BinaryOp::Eq => {
if matches!(lval, Value::Null) || matches!(rval, Value::Null) {
false
} else {
values_equal(&lval, &rval)
}
}
BinaryOp::NotEq => {
if matches!(lval, Value::Null) || matches!(rval, Value::Null) {
false
} else {
!values_equal(&lval, &rval)
}
}
BinaryOp::Lt => {
if matches!(lval, Value::Null) || matches!(rval, Value::Null) {
false
} else {
compare_values(&lval, &rval) == std::cmp::Ordering::Less
}
}
BinaryOp::LtEq => {
if matches!(lval, Value::Null) || matches!(rval, Value::Null) {
false
} else {
matches!(
compare_values(&lval, &rval),
std::cmp::Ordering::Less | std::cmp::Ordering::Equal
)
}
}
BinaryOp::Gt => {
if matches!(lval, Value::Null) || matches!(rval, Value::Null) {
false
} else {
compare_values(&lval, &rval) == std::cmp::Ordering::Greater
}
}
BinaryOp::GtEq => {
if matches!(lval, Value::Null) || matches!(rval, Value::Null) {
false
} else {
matches!(
compare_values(&lval, &rval),
std::cmp::Ordering::Greater | std::cmp::Ordering::Equal
)
}
}
BinaryOp::And => {
evaluate_predicate(left, entity, ctx) && evaluate_predicate(right, entity, ctx)
}
BinaryOp::Or => {
evaluate_predicate(left, entity, ctx) || evaluate_predicate(right, entity, ctx)
}
BinaryOp::Like => {
if let (Value::String(s), Value::String(pattern)) = (&lval, &rval) {
simple_like_match(s, pattern)
} else {
false
}
}
_ => false,
}
}
LogicalExpr::UnaryOp { op, operand } => {
use manifoldb_query::ast::UnaryOp;
match op {
UnaryOp::Not => !evaluate_predicate(operand, entity, ctx),
UnaryOp::IsNull => {
matches!(evaluate_expr(operand, entity, ctx), Value::Null)
}
UnaryOp::IsNotNull => !matches!(evaluate_expr(operand, entity, ctx), Value::Null),
_ => false,
}
}
LogicalExpr::InList { expr, list, negated } => {
let val = evaluate_expr(expr, entity, ctx);
let in_list = list.iter().any(|item| {
let item_val = evaluate_expr(item, entity, ctx);
values_equal(&val, &item_val)
});
if *negated {
!in_list
} else {
in_list
}
}
LogicalExpr::Between { expr, low, high, negated } => {
let val = evaluate_expr(expr, entity, ctx);
let low_val = evaluate_expr(low, entity, ctx);
let high_val = evaluate_expr(high, entity, ctx);
let in_range = compare_values(&val, &low_val) != std::cmp::Ordering::Less
&& compare_values(&val, &high_val) != std::cmp::Ordering::Greater;
if *negated {
!in_range
} else {
in_range
}
}
_ => true, }
}
fn evaluate_binary_op(op: &manifoldb_query::ast::BinaryOp, lval: &Value, rval: &Value) -> Value {
use manifoldb_query::ast::BinaryOp;
match op {
BinaryOp::Add => match (lval, rval) {
(Value::Int(a), Value::Int(b)) => Value::Int(a + b),
(Value::Float(a), Value::Float(b)) => Value::Float(a + b),
(Value::Int(a), Value::Float(b)) => Value::Float(*a as f64 + b),
(Value::Float(a), Value::Int(b)) => Value::Float(a + *b as f64),
(Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
_ => Value::Null,
},
BinaryOp::Sub => match (lval, rval) {
(Value::Int(a), Value::Int(b)) => Value::Int(a - b),
(Value::Float(a), Value::Float(b)) => Value::Float(a - b),
(Value::Int(a), Value::Float(b)) => Value::Float(*a as f64 - b),
(Value::Float(a), Value::Int(b)) => Value::Float(a - *b as f64),
_ => Value::Null,
},
BinaryOp::Mul => match (lval, rval) {
(Value::Int(a), Value::Int(b)) => Value::Int(a * b),
(Value::Float(a), Value::Float(b)) => Value::Float(a * b),
(Value::Int(a), Value::Float(b)) => Value::Float(*a as f64 * b),
(Value::Float(a), Value::Int(b)) => Value::Float(a * *b as f64),
_ => Value::Null,
},
BinaryOp::Div => match (lval, rval) {
(Value::Int(a), Value::Int(b)) if *b != 0 => Value::Int(a / b),
(Value::Float(a), Value::Float(b)) if *b != 0.0 => Value::Float(a / b),
(Value::Int(a), Value::Float(b)) if *b != 0.0 => Value::Float(*a as f64 / b),
(Value::Float(a), Value::Int(b)) if *b != 0 => Value::Float(a / *b as f64),
_ => Value::Null,
},
BinaryOp::EuclideanDistance => match (lval, rval) {
(Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => {
let dist: f32 =
a.iter().zip(b.iter()).map(|(x, y)| (x - y).powi(2)).sum::<f32>().sqrt();
Value::Float(f64::from(dist))
}
_ => Value::Null,
},
BinaryOp::CosineDistance => match (lval, rval) {
(Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => {
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
Value::Float(f64::MAX)
} else {
Value::Float(f64::from(1.0 - (dot / (norm_a * norm_b))))
}
}
_ => Value::Null,
},
BinaryOp::InnerProduct => match (lval, rval) {
(Value::Vector(a), Value::Vector(b)) if a.len() == b.len() => {
let prod: f32 = -a.iter().zip(b.iter()).map(|(x, y)| x * y).sum::<f32>();
Value::Float(f64::from(prod))
}
_ => Value::Null,
},
_ => Value::Null,
}
}
fn values_equal(a: &Value, b: &Value) -> bool {
match (a, b) {
(Value::Null, Value::Null) => true,
(Value::Bool(a), Value::Bool(b)) => a == b,
(Value::Int(a), Value::Int(b)) => a == b,
(Value::Float(a), Value::Float(b)) => (a - b).abs() < f64::EPSILON,
(Value::Int(a), Value::Float(b)) => (*a as f64 - b).abs() < f64::EPSILON,
(Value::Float(a), Value::Int(b)) => (a - *b as f64).abs() < f64::EPSILON,
(Value::String(a), Value::String(b)) => a == b,
(Value::Bytes(a), Value::Bytes(b)) => a == b,
(Value::Vector(a), Value::Vector(b)) => a == b,
_ => false,
}
}
fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
use std::cmp::Ordering;
match (a, b) {
(Value::Null, Value::Null) => Ordering::Equal,
(Value::Null, _) => Ordering::Less,
(_, Value::Null) => Ordering::Greater,
(Value::Int(a), Value::Int(b)) => a.cmp(b),
(Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
(Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b).unwrap_or(Ordering::Equal),
(Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)).unwrap_or(Ordering::Equal),
(Value::String(a), Value::String(b)) => a.cmp(b),
(Value::Bool(a), Value::Bool(b)) => a.cmp(b),
_ => Ordering::Equal,
}
}
fn simple_like_match(s: &str, pattern: &str) -> bool {
let s_chars: Vec<char> = s.chars().collect();
let p_chars: Vec<char> = pattern.chars().collect();
let s_len = s_chars.len();
let p_len = p_chars.len();
let mut dp = vec![vec![false; p_len + 1]; s_len + 1];
dp[0][0] = true;
for j in 1..=p_len {
if p_chars[j - 1] == '%' {
dp[0][j] = dp[0][j - 1];
} else {
break;
}
}
for i in 1..=s_len {
for j in 1..=p_len {
let p_char = p_chars[j - 1];
if p_char == '%' {
dp[i][j] = dp[i][j - 1] || dp[i - 1][j];
} else if p_char == '_' {
dp[i][j] = dp[i - 1][j - 1];
} else {
dp[i][j] = dp[i - 1][j - 1] && s_chars[i - 1] == p_char;
}
}
}
dp[s_len][p_len]
}
fn expr_to_column_name(expr: &LogicalExpr) -> String {
match expr {
LogicalExpr::Column { name, .. } => name.clone(),
LogicalExpr::Alias { alias, .. } => alias.clone(),
LogicalExpr::Wildcard => "*".to_string(),
LogicalExpr::AggregateFunction { func, .. } => format!("{func:?}"),
LogicalExpr::Literal(lit) => format!("{lit:?}"),
_ => "?".to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_literal_to_value() {
assert_eq!(literal_to_value(&Literal::Null), Value::Null);
assert_eq!(literal_to_value(&Literal::Boolean(true)), Value::Bool(true));
assert_eq!(literal_to_value(&Literal::Integer(42)), Value::Int(42));
assert_eq!(literal_to_value(&Literal::Float(1.5)), Value::Float(1.5));
assert_eq!(
literal_to_value(&Literal::String("hello".to_string())),
Value::String("hello".to_string())
);
}
#[test]
fn test_simple_like_match() {
assert!(simple_like_match("hello", "hello"));
assert!(simple_like_match("hello", "hel%"));
assert!(simple_like_match("hello", "%llo"));
assert!(simple_like_match("hello", "%ell%"));
assert!(!simple_like_match("hello", "world"));
assert!(!simple_like_match("hello", "hi%"));
assert!(simple_like_match("hello", "h_llo"));
assert!(simple_like_match("hello", "_ello"));
assert!(simple_like_match("hello", "hell_"));
assert!(simple_like_match("hello", "_____"));
assert!(simple_like_match("Bob", "B_b"));
assert!(!simple_like_match("hello", "h_lo")); assert!(!simple_like_match("hello", "______"));
assert!(simple_like_match("hello", "h_%"));
assert!(simple_like_match("hello", "%_o"));
}
#[test]
fn test_values_equal() {
assert!(values_equal(&Value::Int(42), &Value::Int(42)));
assert!(values_equal(&Value::Float(1.5), &Value::Float(1.5)));
assert!(values_equal(&Value::Int(42), &Value::Float(42.0)));
assert!(!values_equal(&Value::Int(42), &Value::Int(43)));
assert!(values_equal(&Value::Null, &Value::Null));
}
#[test]
fn test_compare_values() {
use std::cmp::Ordering;
assert_eq!(compare_values(&Value::Int(1), &Value::Int(2)), Ordering::Less);
assert_eq!(compare_values(&Value::Int(2), &Value::Int(1)), Ordering::Greater);
assert_eq!(compare_values(&Value::Int(1), &Value::Int(1)), Ordering::Equal);
assert_eq!(compare_values(&Value::Null, &Value::Int(1)), Ordering::Less);
}
}