use super::*;
use crate::storage::query::ast::{Expr, SelectItem};
use crate::storage::query::evaluator;
use crate::storage::query::sql_lowering::{
effective_table_filter, effective_table_group_by_exprs, effective_table_having_filter,
effective_table_projections,
};
mod aggregate;
mod aggregate_planner;
mod aggregate_pushdown_dispatch;
mod filter_compiled;
mod helpers;
mod hybrid;
mod indexed_scan;
mod join;
mod json_writers;
mod table;
mod vector;
pub(crate) use filter_compiled::CompiledEntityFilter;
pub(crate) use helpers::{
evaluate_entity_filter, evaluate_entity_filter_with_db, extract_entity_id_from_filter,
extract_zone_predicates, try_hash_eq_lookup,
};
pub(super) use hybrid::execute_runtime_hybrid_query;
pub(crate) use indexed_scan::try_sorted_index_lookup;
pub(super) use join::execute_runtime_join_query;
pub(super) use json_writers::execute_runtime_serialize_single_entity;
pub(super) use vector::execute_runtime_vector_query;
use aggregate::{execute_aggregate_query, has_aggregate_projections};
use table::{
execute_runtime_canonical_table_node, execute_runtime_canonical_table_query_indexed,
RuntimeTableExecutionContext,
};
pub(super) fn execute_runtime_table_query(
db: &RedDB,
query: &TableQuery,
index_store: Option<&super::index_store::IndexStore>,
) -> RedDBResult<UnifiedResult> {
let effective_projections = effective_table_projections(query);
let effective_filter = effective_table_filter(query);
let effective_group_by = effective_table_group_by_exprs(query);
let effective_having = effective_table_having_filter(query);
if table_query_is_implicit_scalar_select(query)
&& !has_aggregate_projections(&effective_projections)
&& effective_group_by.is_empty()
&& effective_having.is_none()
{
let source = UnifiedRecord::new();
let filter_matches = effective_filter.as_ref().is_none_or(|filter| {
super::join_filter::evaluate_runtime_filter_with_db(
Some(db),
&source,
filter,
None,
None,
)
});
let mut records = if filter_matches {
vec![project_scalar_via_evaluator(
Some(db),
&query.select_items,
&effective_projections,
&source,
)]
} else {
Vec::new()
};
if let Some(offset) = query.offset {
let offset = offset as usize;
if offset >= records.len() {
records.clear();
} else {
records.drain(..offset);
}
}
if let Some(limit) = query.limit {
records.truncate(limit as usize);
}
let columns = projected_columns(&records, &effective_projections);
return Ok(UnifiedResult {
columns,
records,
stats: Default::default(),
pre_serialized_json: None,
});
}
if has_aggregate_projections(&effective_projections) {
return execute_aggregate_query(db, query);
}
if effective_filter.is_some()
&& query.order_by.is_empty()
&& effective_group_by.is_empty()
&& effective_having.is_none()
&& query.expand.is_none()
&& query.offset.is_none()
&& !is_universal_query_source(&query.table)
{
if let Some(entity_id) = extract_entity_id_from_filter(&effective_filter) {
if let Some(entity) =
resolve_table_row_by_logical_id(db, &query.table, EntityId::new(entity_id))
{
let json = execute_runtime_serialize_single_entity(&entity);
let records: Vec<UnifiedRecord> = runtime_table_record_from_entity(entity)
.into_iter()
.collect();
let columns = projected_columns(&records, &effective_projections);
return Ok(UnifiedResult {
columns,
records,
stats: crate::storage::query::unified::QueryStats {
rows_scanned: 1,
..Default::default()
},
pre_serialized_json: Some(json),
});
}
return Ok(UnifiedResult::default());
}
}
let records = execute_runtime_canonical_table_query_indexed(db, query, index_store)?;
let columns = projected_columns(&records, &effective_projections);
Ok(UnifiedResult {
columns,
records,
stats: Default::default(),
pre_serialized_json: None,
})
}
fn resolve_table_row_by_logical_id(
db: &RedDB,
table: &str,
logical_id: EntityId,
) -> Option<UnifiedEntity> {
let store = db.store();
let snapshot = crate::runtime::impl_core::capture_current_snapshot();
if let Some(snapshot) = snapshot.as_ref() {
return store
.table_row_versions_by_logical_id(table, logical_id)
.into_iter()
.filter(|entity| {
crate::runtime::impl_core::entity_visible_with_context(Some(snapshot), entity)
})
.max_by_key(|entity| entity.xmin);
}
store.get_table_row_by_logical_id(table, logical_id)
}
pub(super) fn runtime_record_has_document_capability(record: &UnifiedRecord) -> bool {
record
.get("red_capabilities")
.and_then(|value| match value {
crate::storage::schema::Value::Text(value) => Some(value),
_ => None,
})
.map(|capabilities| {
capabilities
.split(',')
.any(|capability| capability.trim() == "document")
})
.unwrap_or(false)
}
fn table_query_is_implicit_scalar_select(query: &TableQuery) -> bool {
query.table == "any"
&& query.alias.is_none()
&& query.source.is_none()
&& !query.select_items.is_empty()
&& query
.select_items
.iter()
.all(select_item_is_source_free_scalar)
}
fn select_item_is_source_free_scalar(item: &SelectItem) -> bool {
match item {
SelectItem::Wildcard => false,
SelectItem::Expr { expr, .. } => expr_is_source_free(expr),
}
}
fn expr_is_source_free(expr: &Expr) -> bool {
match expr {
Expr::Literal { .. } | Expr::Parameter { .. } => true,
Expr::Column { .. } => false,
Expr::UnaryOp { operand, .. } => expr_is_source_free(operand),
Expr::BinaryOp { lhs, rhs, .. } => expr_is_source_free(lhs) && expr_is_source_free(rhs),
Expr::Cast { inner, .. } => expr_is_source_free(inner),
Expr::FunctionCall { name, args, .. } => {
if name.eq_ignore_ascii_case("CONFIG") {
return (1..=2).contains(&args.len())
&& expr_is_path_like(&args[0])
&& args.get(1).is_none_or(|expr| {
matches!(expr, Expr::Column { .. }) || expr_is_source_free(expr)
});
}
if name.eq_ignore_ascii_case("KV") {
return (2..=3).contains(&args.len())
&& expr_is_path_like(&args[0])
&& expr_is_path_like(&args[1])
&& args.get(2).is_none_or(|expr| {
matches!(expr, Expr::Column { .. }) || expr_is_source_free(expr)
});
}
args.iter().all(expr_is_source_free)
}
Expr::Case {
branches, else_, ..
} => {
branches
.iter()
.all(|(cond, value)| expr_is_source_free(cond) && expr_is_source_free(value))
&& else_.as_deref().is_none_or(expr_is_source_free)
}
Expr::IsNull { operand, .. } => expr_is_source_free(operand),
Expr::InList { target, values, .. } => {
expr_is_source_free(target) && values.iter().all(expr_is_source_free)
}
Expr::Between {
target, low, high, ..
} => expr_is_source_free(target) && expr_is_source_free(low) && expr_is_source_free(high),
Expr::Subquery { .. } => false,
}
}
fn expr_is_path_like(expr: &Expr) -> bool {
matches!(expr, Expr::Literal { .. } | Expr::Column { .. })
}
fn project_scalar_via_evaluator(
db: Option<&RedDB>,
items: &[SelectItem],
projections: &[Projection],
source: &UnifiedRecord,
) -> UnifiedRecord {
let empty_row: &dyn evaluator::Row = &|_: &FieldRef| -> Option<Value> { None };
let mut record = UnifiedRecord::new();
for (item, proj) in items.iter().zip(projections.iter()) {
let SelectItem::Expr { expr, .. } = item else {
continue;
};
let col_name = super::join_filter::projection_name(proj);
let value = match evaluator::evaluate(expr, empty_row) {
Ok(v) => v,
Err(_) => super::join_filter::project_runtime_record_with_db(
db,
source,
std::slice::from_ref(proj),
None,
None,
false,
false,
)
.get(col_name.as_str())
.cloned()
.unwrap_or(Value::Null),
};
record.set(&col_name, value);
}
record
}
pub(super) fn evaluate_runtime_document_filter(
record: &UnifiedRecord,
filter: &crate::storage::query::ast::Filter,
table_name: Option<&str>,
table_alias: Option<&str>,
) -> bool {
evaluate_runtime_filter(record, filter, table_name, table_alias)
}
pub(super) fn runtime_record_rank_score(record: &UnifiedRecord) -> f64 {
[
"_score",
"hybrid_score",
"final_score",
"score",
"graph_score",
"table_score",
"graph_match",
"vector_score",
"vector_similarity",
"structured_score",
"structured_match",
"text_relevance",
]
.into_iter()
.find_map(|field| record.get(field).and_then(runtime_value_number))
.unwrap_or(0.0)
}
pub(super) fn compare_runtime_ranked_records(
left: &UnifiedRecord,
right: &UnifiedRecord,
) -> Ordering {
runtime_record_rank_score(right)
.partial_cmp(&runtime_record_rank_score(left))
.unwrap_or(Ordering::Equal)
.then_with(|| runtime_record_identity_key(left).cmp(&runtime_record_identity_key(right)))
}
pub(super) fn execute_runtime_canonical_expr_node(
db: &RedDB,
node: &crate::storage::query::planner::CanonicalLogicalNode,
expr: &QueryExpr,
) -> RedDBResult<Vec<UnifiedRecord>> {
match expr {
QueryExpr::Table(table) => {
let table_name = table.table.as_str();
let table_alias = table.alias.as_deref().unwrap_or(table_name);
let context = RuntimeTableExecutionContext {
query: table,
table_name,
table_alias,
};
execute_runtime_canonical_table_node(db, node, &context)
}
QueryExpr::Graph(_) | QueryExpr::Path(_) => {
let graph = materialize_graph(db.store().as_ref())?;
let node_properties = materialize_graph_node_properties(db.store().as_ref())?;
let result =
crate::storage::query::unified::UnifiedExecutor::execute_on_with_node_properties(
&graph,
expr,
node_properties,
)
.map_err(|err| RedDBError::Query(err.to_string()))?;
Ok(result.records)
}
QueryExpr::Vector(vector) => Ok(execute_runtime_vector_query(db, vector)?.records),
QueryExpr::Hybrid(hybrid) => Ok(execute_runtime_hybrid_query(db, hybrid)?.records),
QueryExpr::Join(join) => join::execute_runtime_canonical_join_node(db, node, join),
other => Err(RedDBError::Query(format!(
"canonical join execution does not yet support {} child expressions",
query_expr_name(other)
))),
}
}