use std::collections::HashMap;
use std::sync::Arc;
use grafeo_common::grafeo_debug_span;
use grafeo_common::types::{EpochId, TransactionId, Value};
use grafeo_common::utils::error::{Error, Result};
use grafeo_core::graph::GraphStoreMut;
use grafeo_core::graph::lpg::LpgStore;
use crate::catalog::Catalog;
use crate::database::QueryResult;
use crate::query::binder::Binder;
use crate::query::executor::Executor;
use crate::query::optimizer::Optimizer;
use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
use crate::query::planner::Planner;
use crate::transaction::TransactionManager;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum QueryLanguage {
#[cfg(feature = "gql")]
Gql,
#[cfg(feature = "cypher")]
Cypher,
#[cfg(feature = "gremlin")]
Gremlin,
#[cfg(feature = "graphql")]
GraphQL,
#[cfg(feature = "sql-pgq")]
SqlPgq,
#[cfg(feature = "sparql")]
Sparql,
#[cfg(all(feature = "graphql", feature = "rdf"))]
GraphQLRdf,
}
impl QueryLanguage {
#[must_use]
pub const fn is_lpg(&self) -> bool {
match self {
#[cfg(feature = "gql")]
Self::Gql => true,
#[cfg(feature = "cypher")]
Self::Cypher => true,
#[cfg(feature = "gremlin")]
Self::Gremlin => true,
#[cfg(feature = "graphql")]
Self::GraphQL => true,
#[cfg(feature = "sql-pgq")]
Self::SqlPgq => true,
#[cfg(feature = "sparql")]
Self::Sparql => false,
#[cfg(all(feature = "graphql", feature = "rdf"))]
Self::GraphQLRdf => false,
}
}
}
pub type QueryParams = HashMap<String, Value>;
pub struct QueryProcessor {
lpg_store: Arc<LpgStore>,
graph_store: Arc<dyn GraphStoreMut>,
transaction_manager: Arc<TransactionManager>,
catalog: Arc<Catalog>,
optimizer: Optimizer,
transaction_context: Option<(EpochId, TransactionId)>,
#[cfg(feature = "rdf")]
rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
}
impl QueryProcessor {
#[must_use]
pub fn for_lpg(store: Arc<LpgStore>) -> Self {
let optimizer = Optimizer::from_store(&store);
let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
Self {
lpg_store: store,
graph_store,
transaction_manager: Arc::new(TransactionManager::new()),
catalog: Arc::new(Catalog::new()),
optimizer,
transaction_context: None,
#[cfg(feature = "rdf")]
rdf_store: None,
}
}
#[must_use]
pub fn for_lpg_with_transaction(
store: Arc<LpgStore>,
transaction_manager: Arc<TransactionManager>,
) -> Self {
let optimizer = Optimizer::from_store(&store);
let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
Self {
lpg_store: store,
graph_store,
transaction_manager,
catalog: Arc::new(Catalog::new()),
optimizer,
transaction_context: None,
#[cfg(feature = "rdf")]
rdf_store: None,
}
}
pub fn for_graph_store_with_transaction(
store: Arc<dyn GraphStoreMut>,
transaction_manager: Arc<TransactionManager>,
) -> Result<Self> {
let optimizer = Optimizer::from_graph_store(&*store);
Ok(Self {
lpg_store: Arc::new(LpgStore::new()?),
graph_store: store,
transaction_manager,
catalog: Arc::new(Catalog::new()),
optimizer,
transaction_context: None,
#[cfg(feature = "rdf")]
rdf_store: None,
})
}
#[must_use]
pub fn with_transaction_context(
mut self,
viewing_epoch: EpochId,
transaction_id: TransactionId,
) -> Self {
self.transaction_context = Some((viewing_epoch, transaction_id));
self
}
#[must_use]
pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
self.catalog = catalog;
self
}
#[must_use]
pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
self.optimizer = optimizer;
self
}
pub fn process(
&self,
query: &str,
language: QueryLanguage,
params: Option<&QueryParams>,
) -> Result<QueryResult> {
if language.is_lpg() {
self.process_lpg(query, language, params)
} else {
#[cfg(feature = "rdf")]
{
self.process_rdf(query, language, params)
}
#[cfg(not(feature = "rdf"))]
{
Err(Error::Internal(
"RDF support not enabled. Compile with --features rdf".to_string(),
))
}
}
}
fn process_lpg(
&self,
query: &str,
language: QueryLanguage,
params: Option<&QueryParams>,
) -> Result<QueryResult> {
#[cfg(not(target_arch = "wasm32"))]
let start_time = std::time::Instant::now();
let mut logical_plan = self.translate_lpg(query, language)?;
if let Some(params) = params {
substitute_params(&mut logical_plan, params)?;
}
let mut binder = Binder::new();
let _binding_context = binder.bind(&logical_plan)?;
let optimized_plan = self.optimizer.optimize(logical_plan)?;
if optimized_plan.explain {
let mut plan = optimized_plan;
annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
return Ok(explain_result(&plan));
}
let is_read_only =
!optimized_plan.root.has_mutations() && self.transaction_context.is_none();
let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
Planner::with_context(
Arc::clone(&self.graph_store),
Arc::clone(&self.transaction_manager),
Some(transaction_id),
epoch,
)
} else {
Planner::with_context(
Arc::clone(&self.graph_store),
Arc::clone(&self.transaction_manager),
None,
self.transaction_manager.current_epoch(),
)
}
.with_read_only(is_read_only);
let mut physical_plan = planner.plan(&optimized_plan)?;
let executor = Executor::with_columns(physical_plan.columns.clone());
let mut result = executor.execute(physical_plan.operator.as_mut())?;
let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
{
let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
result.execution_time_ms = Some(elapsed_ms);
}
result.rows_scanned = Some(rows_scanned);
Ok(result)
}
fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
match language {
#[cfg(feature = "gql")]
QueryLanguage::Gql => {
use crate::query::translators::gql;
gql::translate(query)
}
#[cfg(feature = "cypher")]
QueryLanguage::Cypher => {
use crate::query::translators::cypher;
cypher::translate(query)
}
#[cfg(feature = "gremlin")]
QueryLanguage::Gremlin => {
use crate::query::translators::gremlin;
gremlin::translate(query)
}
#[cfg(feature = "graphql")]
QueryLanguage::GraphQL => {
use crate::query::translators::graphql;
graphql::translate(query)
}
#[cfg(feature = "sql-pgq")]
QueryLanguage::SqlPgq => {
use crate::query::translators::sql_pgq;
sql_pgq::translate(query)
}
#[allow(unreachable_patterns)]
_ => Err(Error::Internal(format!(
"Language {:?} is not an LPG language",
language
))),
}
}
#[must_use]
pub fn lpg_store(&self) -> &Arc<LpgStore> {
&self.lpg_store
}
#[must_use]
pub fn catalog(&self) -> &Arc<Catalog> {
&self.catalog
}
#[must_use]
pub fn optimizer(&self) -> &Optimizer {
&self.optimizer
}
}
impl QueryProcessor {
#[must_use]
pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
&self.transaction_manager
}
}
#[cfg(feature = "rdf")]
impl QueryProcessor {
#[must_use]
pub fn with_rdf(
lpg_store: Arc<LpgStore>,
rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
) -> Self {
let optimizer = Optimizer::from_store(&lpg_store);
let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
Self {
lpg_store,
graph_store,
transaction_manager: Arc::new(TransactionManager::new()),
catalog: Arc::new(Catalog::new()),
optimizer,
transaction_context: None,
rdf_store: Some(rdf_store),
}
}
#[must_use]
pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
self.rdf_store.as_ref()
}
fn process_rdf(
&self,
query: &str,
language: QueryLanguage,
params: Option<&QueryParams>,
) -> Result<QueryResult> {
use crate::query::planner::rdf::RdfPlanner;
let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
Error::Internal("RDF store not configured for this processor".to_string())
})?;
let mut logical_plan = self.translate_rdf(query, language)?;
if let Some(params) = params {
substitute_params(&mut logical_plan, params)?;
}
let mut binder = Binder::new();
let _binding_context = binder.bind(&logical_plan)?;
let optimized_plan = self.optimizer.optimize(logical_plan)?;
if optimized_plan.explain {
return Ok(explain_result(&optimized_plan));
}
let planner = RdfPlanner::new(Arc::clone(rdf_store));
let mut physical_plan = planner.plan(&optimized_plan)?;
let executor = Executor::with_columns(physical_plan.columns.clone());
executor.execute(physical_plan.operator.as_mut())
}
fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
match language {
#[cfg(feature = "sparql")]
QueryLanguage::Sparql => {
use crate::query::translators::sparql;
sparql::translate(query)
}
#[cfg(all(feature = "graphql", feature = "rdf"))]
QueryLanguage::GraphQLRdf => {
use crate::query::translators::graphql_rdf;
graphql_rdf::translate(query, "http://example.org/")
}
_ => Err(Error::Internal(format!(
"Language {:?} is not an RDF language",
language
))),
}
}
}
pub(crate) fn annotate_pushdown_hints(
op: &mut LogicalOperator,
store: &dyn grafeo_core::graph::GraphStore,
) {
#[allow(clippy::wildcard_imports)]
use crate::query::plan::*;
match op {
LogicalOperator::Filter(filter) => {
annotate_pushdown_hints(&mut filter.input, store);
if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
}
}
LogicalOperator::NodeScan(op) => {
if let Some(input) = &mut op.input {
annotate_pushdown_hints(input, store);
}
}
LogicalOperator::EdgeScan(op) => {
if let Some(input) = &mut op.input {
annotate_pushdown_hints(input, store);
}
}
LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Join(op) => {
annotate_pushdown_hints(&mut op.left, store);
annotate_pushdown_hints(&mut op.right, store);
}
LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
LogicalOperator::Union(op) => {
for input in &mut op.inputs {
annotate_pushdown_hints(input, store);
}
}
LogicalOperator::Apply(op) => {
annotate_pushdown_hints(&mut op.input, store);
annotate_pushdown_hints(&mut op.subplan, store);
}
LogicalOperator::Otherwise(op) => {
annotate_pushdown_hints(&mut op.left, store);
annotate_pushdown_hints(&mut op.right, store);
}
_ => {}
}
}
fn infer_pushdown(
predicate: &LogicalExpression,
scan: &crate::query::plan::NodeScanOp,
store: &dyn grafeo_core::graph::GraphStore,
) -> Option<crate::query::plan::PushdownHint> {
#[allow(clippy::wildcard_imports)]
use crate::query::plan::*;
match predicate {
LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
if let Some(prop) = extract_property_name(left, &scan.variable)
.or_else(|| extract_property_name(right, &scan.variable))
{
if store.has_property_index(&prop) {
return Some(PushdownHint::IndexLookup { property: prop });
}
if scan.label.is_some() {
return Some(PushdownHint::LabelFirst);
}
}
None
}
LogicalExpression::Binary {
left,
op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
right,
} => {
if let Some(prop) = extract_property_name(left, &scan.variable)
.or_else(|| extract_property_name(right, &scan.variable))
{
if store.has_property_index(&prop) {
return Some(PushdownHint::RangeScan { property: prop });
}
if scan.label.is_some() {
return Some(PushdownHint::LabelFirst);
}
}
None
}
LogicalExpression::Binary {
left,
op: BinaryOp::And,
..
} => infer_pushdown(left, scan, store),
_ => {
if scan.label.is_some() {
Some(PushdownHint::LabelFirst)
} else {
None
}
}
}
}
fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
if let LogicalExpression::Property { variable, property } = expr
&& variable == scan_var
{
Some(property.clone())
} else {
None
}
}
pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
let tree_text = plan.root.explain_tree();
QueryResult {
columns: vec!["plan".to_string()],
column_types: vec![grafeo_common::types::LogicalType::String],
rows: vec![vec![Value::String(tree_text.into())]],
execution_time_ms: None,
rows_scanned: None,
status_message: None,
gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
}
}
pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
substitute_in_operator(&mut plan.root, params)
}
fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
#[allow(clippy::wildcard_imports)]
use crate::query::plan::*;
match op {
LogicalOperator::Filter(filter) => {
substitute_in_expression(&mut filter.predicate, params)?;
substitute_in_operator(&mut filter.input, params)?;
}
LogicalOperator::Return(ret) => {
for item in &mut ret.items {
substitute_in_expression(&mut item.expression, params)?;
}
substitute_in_operator(&mut ret.input, params)?;
}
LogicalOperator::Project(proj) => {
for p in &mut proj.projections {
substitute_in_expression(&mut p.expression, params)?;
}
substitute_in_operator(&mut proj.input, params)?;
}
LogicalOperator::NodeScan(scan) => {
if let Some(input) = &mut scan.input {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::EdgeScan(scan) => {
if let Some(input) = &mut scan.input {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::Expand(expand) => {
substitute_in_operator(&mut expand.input, params)?;
}
LogicalOperator::Join(join) => {
substitute_in_operator(&mut join.left, params)?;
substitute_in_operator(&mut join.right, params)?;
for cond in &mut join.conditions {
substitute_in_expression(&mut cond.left, params)?;
substitute_in_expression(&mut cond.right, params)?;
}
}
LogicalOperator::LeftJoin(join) => {
substitute_in_operator(&mut join.left, params)?;
substitute_in_operator(&mut join.right, params)?;
if let Some(cond) = &mut join.condition {
substitute_in_expression(cond, params)?;
}
}
LogicalOperator::Aggregate(agg) => {
for expr in &mut agg.group_by {
substitute_in_expression(expr, params)?;
}
for agg_expr in &mut agg.aggregates {
if let Some(expr) = &mut agg_expr.expression {
substitute_in_expression(expr, params)?;
}
}
substitute_in_operator(&mut agg.input, params)?;
}
LogicalOperator::Sort(sort) => {
for key in &mut sort.keys {
substitute_in_expression(&mut key.expression, params)?;
}
substitute_in_operator(&mut sort.input, params)?;
}
LogicalOperator::Limit(limit) => {
resolve_count_param(&mut limit.count, params)?;
substitute_in_operator(&mut limit.input, params)?;
}
LogicalOperator::Skip(skip) => {
resolve_count_param(&mut skip.count, params)?;
substitute_in_operator(&mut skip.input, params)?;
}
LogicalOperator::Distinct(distinct) => {
substitute_in_operator(&mut distinct.input, params)?;
}
LogicalOperator::CreateNode(create) => {
for (_, expr) in &mut create.properties {
substitute_in_expression(expr, params)?;
}
if let Some(input) = &mut create.input {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::CreateEdge(create) => {
for (_, expr) in &mut create.properties {
substitute_in_expression(expr, params)?;
}
substitute_in_operator(&mut create.input, params)?;
}
LogicalOperator::DeleteNode(delete) => {
substitute_in_operator(&mut delete.input, params)?;
}
LogicalOperator::DeleteEdge(delete) => {
substitute_in_operator(&mut delete.input, params)?;
}
LogicalOperator::SetProperty(set) => {
for (_, expr) in &mut set.properties {
substitute_in_expression(expr, params)?;
}
substitute_in_operator(&mut set.input, params)?;
}
LogicalOperator::Union(union) => {
for input in &mut union.inputs {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::AntiJoin(anti) => {
substitute_in_operator(&mut anti.left, params)?;
substitute_in_operator(&mut anti.right, params)?;
}
LogicalOperator::Bind(bind) => {
substitute_in_expression(&mut bind.expression, params)?;
substitute_in_operator(&mut bind.input, params)?;
}
LogicalOperator::TripleScan(scan) => {
if let Some(input) = &mut scan.input {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::Unwind(unwind) => {
substitute_in_expression(&mut unwind.expression, params)?;
substitute_in_operator(&mut unwind.input, params)?;
}
LogicalOperator::MapCollect(mc) => {
substitute_in_operator(&mut mc.input, params)?;
}
LogicalOperator::Merge(merge) => {
for (_, expr) in &mut merge.match_properties {
substitute_in_expression(expr, params)?;
}
for (_, expr) in &mut merge.on_create {
substitute_in_expression(expr, params)?;
}
for (_, expr) in &mut merge.on_match {
substitute_in_expression(expr, params)?;
}
substitute_in_operator(&mut merge.input, params)?;
}
LogicalOperator::MergeRelationship(merge_rel) => {
for (_, expr) in &mut merge_rel.match_properties {
substitute_in_expression(expr, params)?;
}
for (_, expr) in &mut merge_rel.on_create {
substitute_in_expression(expr, params)?;
}
for (_, expr) in &mut merge_rel.on_match {
substitute_in_expression(expr, params)?;
}
substitute_in_operator(&mut merge_rel.input, params)?;
}
LogicalOperator::AddLabel(add_label) => {
substitute_in_operator(&mut add_label.input, params)?;
}
LogicalOperator::RemoveLabel(remove_label) => {
substitute_in_operator(&mut remove_label.input, params)?;
}
LogicalOperator::ShortestPath(sp) => {
substitute_in_operator(&mut sp.input, params)?;
}
LogicalOperator::InsertTriple(insert) => {
if let Some(ref mut input) = insert.input {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::DeleteTriple(delete) => {
if let Some(ref mut input) = delete.input {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::Modify(modify) => {
substitute_in_operator(&mut modify.where_clause, params)?;
}
LogicalOperator::ClearGraph(_)
| LogicalOperator::CreateGraph(_)
| LogicalOperator::DropGraph(_)
| LogicalOperator::LoadGraph(_)
| LogicalOperator::CopyGraph(_)
| LogicalOperator::MoveGraph(_)
| LogicalOperator::AddGraph(_) => {}
LogicalOperator::HorizontalAggregate(op) => {
substitute_in_operator(&mut op.input, params)?;
}
LogicalOperator::Empty => {}
LogicalOperator::VectorScan(scan) => {
substitute_in_expression(&mut scan.query_vector, params)?;
if let Some(ref mut input) = scan.input {
substitute_in_operator(input, params)?;
}
}
LogicalOperator::VectorJoin(join) => {
substitute_in_expression(&mut join.query_vector, params)?;
substitute_in_operator(&mut join.input, params)?;
}
LogicalOperator::Except(except) => {
substitute_in_operator(&mut except.left, params)?;
substitute_in_operator(&mut except.right, params)?;
}
LogicalOperator::Intersect(intersect) => {
substitute_in_operator(&mut intersect.left, params)?;
substitute_in_operator(&mut intersect.right, params)?;
}
LogicalOperator::Otherwise(otherwise) => {
substitute_in_operator(&mut otherwise.left, params)?;
substitute_in_operator(&mut otherwise.right, params)?;
}
LogicalOperator::Apply(apply) => {
substitute_in_operator(&mut apply.input, params)?;
substitute_in_operator(&mut apply.subplan, params)?;
}
LogicalOperator::ParameterScan(_) => {}
LogicalOperator::MultiWayJoin(mwj) => {
for input in &mut mwj.inputs {
substitute_in_operator(input, params)?;
}
for cond in &mut mwj.conditions {
substitute_in_expression(&mut cond.left, params)?;
substitute_in_expression(&mut cond.right, params)?;
}
}
LogicalOperator::CreatePropertyGraph(_) => {}
LogicalOperator::CallProcedure(_) => {}
LogicalOperator::LoadData(_) => {}
}
Ok(())
}
fn resolve_count_param(
count: &mut crate::query::plan::CountExpr,
params: &QueryParams,
) -> Result<()> {
use crate::query::plan::CountExpr;
use grafeo_common::utils::error::{QueryError, QueryErrorKind};
if let CountExpr::Parameter(name) = count {
let value = params.get(name.as_str()).ok_or_else(|| {
Error::Query(QueryError::new(
QueryErrorKind::Semantic,
format!("Missing parameter for SKIP/LIMIT: ${name}"),
))
})?;
let n = match value {
Value::Int64(i) if *i >= 0 => *i as usize,
Value::Int64(i) => {
return Err(Error::Query(QueryError::new(
QueryErrorKind::Semantic,
format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
)));
}
other => {
return Err(Error::Query(QueryError::new(
QueryErrorKind::Semantic,
format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
)));
}
};
*count = CountExpr::Literal(n);
}
Ok(())
}
fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
use crate::query::plan::LogicalExpression;
match expr {
LogicalExpression::Parameter(name) => {
if let Some(value) = params.get(name) {
*expr = LogicalExpression::Literal(value.clone());
} else {
return Err(Error::Internal(format!("Missing parameter: ${}", name)));
}
}
LogicalExpression::Binary { left, right, .. } => {
substitute_in_expression(left, params)?;
substitute_in_expression(right, params)?;
}
LogicalExpression::Unary { operand, .. } => {
substitute_in_expression(operand, params)?;
}
LogicalExpression::FunctionCall { args, .. } => {
for arg in args {
substitute_in_expression(arg, params)?;
}
}
LogicalExpression::List(items) => {
for item in items {
substitute_in_expression(item, params)?;
}
}
LogicalExpression::Map(pairs) => {
for (_, value) in pairs {
substitute_in_expression(value, params)?;
}
}
LogicalExpression::IndexAccess { base, index } => {
substitute_in_expression(base, params)?;
substitute_in_expression(index, params)?;
}
LogicalExpression::SliceAccess { base, start, end } => {
substitute_in_expression(base, params)?;
if let Some(s) = start {
substitute_in_expression(s, params)?;
}
if let Some(e) = end {
substitute_in_expression(e, params)?;
}
}
LogicalExpression::Case {
operand,
when_clauses,
else_clause,
} => {
if let Some(op) = operand {
substitute_in_expression(op, params)?;
}
for (cond, result) in when_clauses {
substitute_in_expression(cond, params)?;
substitute_in_expression(result, params)?;
}
if let Some(el) = else_clause {
substitute_in_expression(el, params)?;
}
}
LogicalExpression::Property { .. }
| LogicalExpression::Variable(_)
| LogicalExpression::Literal(_)
| LogicalExpression::Labels(_)
| LogicalExpression::Type(_)
| LogicalExpression::Id(_) => {}
LogicalExpression::ListComprehension {
list_expr,
filter_expr,
map_expr,
..
} => {
substitute_in_expression(list_expr, params)?;
if let Some(filter) = filter_expr {
substitute_in_expression(filter, params)?;
}
substitute_in_expression(map_expr, params)?;
}
LogicalExpression::ListPredicate {
list_expr,
predicate,
..
} => {
substitute_in_expression(list_expr, params)?;
substitute_in_expression(predicate, params)?;
}
LogicalExpression::ExistsSubquery(_)
| LogicalExpression::CountSubquery(_)
| LogicalExpression::ValueSubquery(_) => {
}
LogicalExpression::PatternComprehension { projection, .. } => {
substitute_in_expression(projection, params)?;
}
LogicalExpression::MapProjection { entries, .. } => {
for entry in entries {
if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
substitute_in_expression(expr, params)?;
}
}
}
LogicalExpression::Reduce {
initial,
list,
expression,
..
} => {
substitute_in_expression(initial, params)?;
substitute_in_expression(list, params)?;
substitute_in_expression(expression, params)?;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_query_language_is_lpg() {
#[cfg(feature = "gql")]
assert!(QueryLanguage::Gql.is_lpg());
#[cfg(feature = "cypher")]
assert!(QueryLanguage::Cypher.is_lpg());
#[cfg(feature = "sparql")]
assert!(!QueryLanguage::Sparql.is_lpg());
}
#[test]
fn test_processor_creation() {
let store = Arc::new(LpgStore::new().unwrap());
let processor = QueryProcessor::for_lpg(store);
assert!(processor.lpg_store().node_count() == 0);
}
#[cfg(feature = "gql")]
#[test]
fn test_process_simple_gql() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
store.create_node(&["Person"]);
let processor = QueryProcessor::for_lpg(store);
let result = processor
.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
.unwrap();
assert_eq!(result.row_count(), 2);
assert_eq!(result.columns[0], "n");
}
#[cfg(feature = "cypher")]
#[test]
fn test_process_simple_cypher() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
let processor = QueryProcessor::for_lpg(store);
let result = processor
.process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
.unwrap();
assert_eq!(result.row_count(), 1);
}
#[cfg(feature = "gql")]
#[test]
fn test_process_with_params() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
let processor = QueryProcessor::for_lpg(store);
let mut params = HashMap::new();
params.insert("min_age".to_string(), Value::Int64(30));
let result = processor
.process(
"MATCH (n:Person) WHERE n.age > $min_age RETURN n",
QueryLanguage::Gql,
Some(¶ms),
)
.unwrap();
assert_eq!(result.row_count(), 2);
}
#[cfg(feature = "gql")]
#[test]
fn test_missing_param_error() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
let processor = QueryProcessor::for_lpg(store);
let params: HashMap<String, Value> = HashMap::new();
let result = processor.process(
"MATCH (n:Person) WHERE n.age > $min_age RETURN n",
QueryLanguage::Gql,
Some(¶ms),
);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string().contains("Missing parameter"),
"Expected 'Missing parameter' error, got: {}",
err
);
}
#[cfg(feature = "gql")]
#[test]
fn test_params_in_filter_with_property() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
let processor = QueryProcessor::for_lpg(store);
let mut params = HashMap::new();
params.insert("threshold".to_string(), Value::Int64(15));
let result = processor
.process(
"MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
QueryLanguage::Gql,
Some(¶ms),
)
.unwrap();
assert_eq!(result.row_count(), 1);
let row = &result.rows[0];
assert_eq!(row[0], Value::Int64(20));
}
#[cfg(feature = "gql")]
#[test]
fn test_params_in_multiple_where_conditions() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node_with_props(
&["Person"],
[("age", Value::Int64(25)), ("score", Value::Int64(80))],
);
store.create_node_with_props(
&["Person"],
[("age", Value::Int64(35)), ("score", Value::Int64(90))],
);
store.create_node_with_props(
&["Person"],
[("age", Value::Int64(45)), ("score", Value::Int64(70))],
);
let processor = QueryProcessor::for_lpg(store);
let mut params = HashMap::new();
params.insert("min_age".to_string(), Value::Int64(30));
params.insert("min_score".to_string(), Value::Int64(75));
let result = processor
.process(
"MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
QueryLanguage::Gql,
Some(¶ms),
)
.unwrap();
assert_eq!(result.row_count(), 1);
}
#[cfg(feature = "gql")]
#[test]
fn test_params_with_in_list() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
let processor = QueryProcessor::for_lpg(store);
let mut params = HashMap::new();
params.insert("target".to_string(), Value::String("active".into()));
let result = processor
.process(
"MATCH (n:Item) WHERE n.status = $target RETURN n",
QueryLanguage::Gql,
Some(¶ms),
)
.unwrap();
assert_eq!(result.row_count(), 1);
}
#[cfg(feature = "gql")]
#[test]
fn test_params_same_type_comparison() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
let processor = QueryProcessor::for_lpg(store);
let mut params = HashMap::new();
params.insert("threshold".to_string(), Value::Int64(75));
let result = processor
.process(
"MATCH (n:Data) WHERE n.value > $threshold RETURN n",
QueryLanguage::Gql,
Some(¶ms),
)
.unwrap();
assert_eq!(result.row_count(), 1);
}
#[cfg(feature = "gql")]
#[test]
fn test_process_empty_result_has_columns() {
let store = Arc::new(LpgStore::new().unwrap());
let processor = QueryProcessor::for_lpg(store);
let result = processor
.process(
"MATCH (n:Person) RETURN n.name AS name, n.age AS age",
QueryLanguage::Gql,
None,
)
.unwrap();
assert_eq!(result.row_count(), 0);
assert_eq!(result.columns.len(), 2);
assert_eq!(result.columns[0], "name");
assert_eq!(result.columns[1], "age");
}
#[cfg(feature = "gql")]
#[test]
fn test_params_string_equality() {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
let processor = QueryProcessor::for_lpg(store);
let mut params = HashMap::new();
params.insert("target".to_string(), Value::String("beta".into()));
let result = processor
.process(
"MATCH (n:Item) WHERE n.name = $target RETURN n.name",
QueryLanguage::Gql,
Some(¶ms),
)
.unwrap();
assert_eq!(result.row_count(), 1);
assert_eq!(result.rows[0][0], Value::String("beta".into()));
}
}