pub mod analysis;
mod builder;
mod config_helpers;
mod expression;
mod join_ops;
mod scan_ops;
mod udf;
pub mod vector_ops;
#[cfg(test)]
mod test_fixtures;
pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
use crate::config::GraphConfig;
use crate::error::Result;
use crate::logical_plan::LogicalOperator;
use datafusion::logical_expr::LogicalPlan;
use lance_graph_catalog::GraphSourceCatalog;
use std::sync::Arc;
pub trait GraphPhysicalPlanner {
fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan>;
}
pub struct DataFusionPlanner {
pub(crate) config: GraphConfig,
pub(crate) catalog: Option<Arc<dyn GraphSourceCatalog>>,
}
impl DataFusionPlanner {
pub fn new(config: GraphConfig) -> Self {
Self {
config,
catalog: None,
}
}
pub fn with_catalog(config: GraphConfig, catalog: Arc<dyn GraphSourceCatalog>) -> Self {
Self {
config,
catalog: Some(catalog),
}
}
pub(crate) fn plan_error<E: std::fmt::Display>(
&self,
context: &str,
error: E,
) -> crate::error::GraphError {
crate::error::GraphError::PlanError {
message: format!("{}: {}", context, error),
location: snafu::Location::new(file!(), line!(), column!()),
}
}
}
impl GraphPhysicalPlanner for DataFusionPlanner {
fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan> {
let analysis = analysis::analyze(logical_plan)?;
let mut ctx = PlanningContext::new(&analysis);
self.build_operator(&mut ctx, logical_plan)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ast::{
BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, RelationshipDirection,
ValueExpression,
};
use crate::logical_plan::LogicalOperator;
use test_fixtures::{make_catalog, person_config, person_knows_config, person_scan};
#[test]
fn test_filter_preserves_error_context() {
let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
let scan = person_scan("p");
let filter = LogicalOperator::Filter {
input: Box::new(scan),
predicate: BooleanExpression::Comparison {
left: ValueExpression::Property(PropertyRef {
variable: "p".to_string(),
property: "age".to_string(),
}),
operator: ComparisonOperator::GreaterThan,
right: ValueExpression::Literal(PropertyValue::Integer(30)),
},
};
let result = planner.plan(&filter);
assert!(result.is_ok(), "Valid filter should succeed");
}
#[test]
fn test_exists_on_relationship_property_is_qualified() {
let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
let scan_a = person_scan("a");
let expand = LogicalOperator::Expand {
input: Box::new(scan_a),
source_variable: "a".to_string(),
target_variable: "b".to_string(),
target_label: "Person".to_string(),
relationship_types: vec!["KNOWS".to_string()],
direction: RelationshipDirection::Outgoing,
relationship_variable: Some("r".to_string()),
properties: Default::default(),
target_properties: Default::default(),
};
let pred = BooleanExpression::Exists(PropertyRef {
variable: "r".into(),
property: "src_person_id".into(),
});
let filter = LogicalOperator::Filter {
input: Box::new(expand),
predicate: pred,
};
let df_plan = planner.plan(&filter).unwrap();
let s = format!("{:?}", df_plan);
assert!(s.contains("Filter"), "missing Filter: {}", s);
assert!(
s.contains("r__src_person_id") || s.contains("IsNotNull"),
"missing qualified rel column or IsNotNull in filter: {}",
s
);
}
#[test]
fn test_in_list_on_relationship_property_is_qualified() {
let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
let scan_a = person_scan("a");
let expand = LogicalOperator::Expand {
input: Box::new(scan_a),
source_variable: "a".to_string(),
target_variable: "b".to_string(),
target_label: "Person".to_string(),
relationship_types: vec!["KNOWS".to_string()],
direction: RelationshipDirection::Outgoing,
relationship_variable: Some("r".to_string()),
properties: Default::default(),
target_properties: Default::default(),
};
let filter = LogicalOperator::Filter {
input: Box::new(expand),
predicate: BooleanExpression::In {
expression: ValueExpression::Property(PropertyRef {
variable: "r".into(),
property: "src_person_id".into(),
}),
list: vec![
ValueExpression::Literal(PropertyValue::Integer(1)),
ValueExpression::Literal(PropertyValue::Integer(2)),
],
},
};
let df_plan = planner.plan(&filter).unwrap();
let s = format!("{:?}", df_plan);
assert!(s.contains("Filter"), "missing Filter: {}", s);
assert!(
s.contains("r__src_person_id"),
"missing qualified rel column in IN list filter: {}",
s
);
}
#[test]
fn test_exists_and_in_on_node_props_materialized() {
let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
let scan_a = person_scan("a");
let pred = BooleanExpression::And(
Box::new(BooleanExpression::Exists(PropertyRef {
variable: "a".into(),
property: "name".into(),
})),
Box::new(BooleanExpression::In {
expression: ValueExpression::Property(PropertyRef {
variable: "a".into(),
property: "age".into(),
}),
list: vec![
ValueExpression::Literal(PropertyValue::Integer(20)),
ValueExpression::Literal(PropertyValue::Integer(30)),
],
}),
);
let filter = LogicalOperator::Filter {
input: Box::new(scan_a),
predicate: pred,
};
let df_plan = planner.plan(&filter).unwrap();
let s = format!("{:?}", df_plan);
assert!(s.contains("Filter"), "missing Filter: {}", s);
assert!(
s.contains("a__name") || s.contains("IsNotNull"),
"missing EXISTS on a__name: {}",
s
);
assert!(
s.contains("a__age") || s.contains("age"),
"missing IN on a.age: {}",
s
);
}
}