use super::analysis::{PlanningContext, RelationshipInstance};
use crate::ast::PropertyValue;
use crate::case_insensitive::qualify_column;
use crate::error::Result;
use datafusion::logical_expr::{col, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
use lance_graph_catalog::GraphSourceCatalog;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use super::DataFusionPlanner;
impl DataFusionPlanner {
pub(crate) fn build_scan(
&self,
_ctx: &PlanningContext,
variable: &str,
label: &str,
properties: &HashMap<String, PropertyValue>,
) -> Result<LogicalPlan> {
if let Some(cat) = &self.catalog {
if let Some(source) = cat.node_source(label) {
let schema = source.schema();
let normalized_label = label.to_lowercase();
let mut builder = LogicalPlanBuilder::scan(&normalized_label, source, None)
.map_err(|e| {
self.plan_error(&format!("Failed to scan node source '{}'", label), e)
})?;
if !properties.is_empty() {
let filter_exprs: Vec<Expr> = properties
.iter()
.map(|(k, v)| {
let lit_expr = super::expression::to_df_value_expr(
&crate::ast::ValueExpression::Literal(v.clone()),
);
Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k)),
op: Operator::Eq,
right: Box::new(lit_expr),
})
})
.collect();
let combined_filter = filter_exprs
.into_iter()
.reduce(|acc, expr| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(acc),
op: Operator::And,
right: Box::new(expr),
})
})
.unwrap();
builder = builder
.filter(combined_filter)
.map_err(|e| self.plan_error("Failed to apply property filters", e))?;
}
let qualified_exprs: Vec<Expr> = schema
.fields()
.iter()
.map(|field| {
let qualified_name = qualify_column(variable, field.name());
col(field.name()).alias(&qualified_name)
})
.collect();
builder = builder
.project(qualified_exprs)
.map_err(|e| self.plan_error("Failed to project qualified columns", e))?;
return builder
.build()
.map_err(|e| self.plan_error("Failed to build scan plan", e));
} else {
return Err(crate::error::GraphError::ConfigError {
message: format!(
"Node label '{}' not found in catalog. \
Ensure the label is registered in your GraphConfig with .with_node_label()",
label
),
location: snafu::Location::new(file!(), line!(), column!()),
});
}
}
let empty_source = Arc::new(lance_graph_catalog::SimpleTableSource::empty());
let normalized_label = label.to_lowercase();
let builder =
LogicalPlanBuilder::scan(&normalized_label, empty_source, None).map_err(|e| {
self.plan_error(&format!("Failed to create table scan for '{}'", label), e)
})?;
builder
.build()
.map_err(|e| self.plan_error("Failed to build scan plan", e))
}
pub(crate) fn build_relationship_scan(
&self,
rel_instance: &RelationshipInstance,
rel_source: Arc<dyn datafusion::logical_expr::TableSource>,
relationship_properties: &HashMap<String, PropertyValue>,
) -> Result<LogicalPlan> {
let rel_schema = rel_source.schema();
let normalized_rel_type = rel_instance.rel_type.to_lowercase();
let mut rel_builder = LogicalPlanBuilder::scan(&normalized_rel_type, rel_source, None)
.map_err(|e| {
self.plan_error(
&format!("Failed to scan relationship '{}'", rel_instance.rel_type),
e,
)
})?;
for (k, v) in relationship_properties.iter() {
let lit_expr = super::expression::to_df_value_expr(
&crate::ast::ValueExpression::Literal(v.clone()),
);
let filter_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k)),
op: Operator::Eq,
right: Box::new(lit_expr),
});
rel_builder = rel_builder.filter(filter_expr).map_err(|e| {
self.plan_error(
&format!("Failed to apply relationship filter on '{}'", k),
e,
)
})?;
}
let rel_qualified_exprs: Vec<Expr> = rel_schema
.fields()
.iter()
.map(|field| {
let qualified_name = qualify_column(&rel_instance.alias, field.name());
col(field.name()).alias(&qualified_name)
})
.collect();
rel_builder
.project(rel_qualified_exprs)
.map_err(|e| self.plan_error("Failed to project relationship columns", e))?
.build()
.map_err(|e| self.plan_error("Failed to build relationship scan", e))
}
pub(crate) fn get_expected_varlength_columns(
&self,
ctx: &PlanningContext,
source_variable: &str,
target_variable: &str,
) -> Result<HashSet<String>> {
let mut expected = HashSet::new();
let Some(cat) = &self.catalog else {
return Ok(expected);
};
if let Some(source_label) = ctx.analysis.var_to_label.get(source_variable) {
if let Some(source) = cat.node_source(source_label) {
for field in source.schema().fields() {
expected.insert(qualify_column(source_variable, field.name()));
}
}
}
if let Some(target_label) = ctx.analysis.var_to_label.get(target_variable) {
if let Some(target) = cat.node_source(target_label) {
for field in target.schema().fields() {
expected.insert(qualify_column(target_variable, field.name()));
}
}
}
Ok(expected)
}
pub(crate) fn build_qualified_relationship_scan(
&self,
catalog: &Arc<dyn GraphSourceCatalog>,
rel_instance: &RelationshipInstance,
) -> Result<LogicalPlan> {
let rel_source = catalog
.relationship_source(&rel_instance.rel_type)
.ok_or_else(|| crate::error::GraphError::ConfigError {
message: format!(
"No table source found for relationship: {}",
rel_instance.rel_type
),
location: snafu::Location::new(file!(), line!(), column!()),
})?;
let rel_schema = rel_source.schema();
let normalized_rel_type = rel_instance.rel_type.to_lowercase();
let rel_builder = LogicalPlanBuilder::scan(&normalized_rel_type, rel_source, None)
.map_err(|e| crate::error::GraphError::PlanError {
message: format!("Failed to scan relationship: {}", e),
location: snafu::Location::new(file!(), line!(), column!()),
})?;
let rel_alias_lower = rel_instance.alias.to_lowercase();
let rel_qualified_exprs: Vec<Expr> = rel_schema
.fields()
.iter()
.map(|field| {
let qualified_name = qualify_column(&rel_alias_lower, field.name());
col(field.name()).alias(&qualified_name)
})
.collect();
rel_builder
.project(rel_qualified_exprs)
.map_err(|e| crate::error::GraphError::PlanError {
message: format!("Failed to project relationship: {}", e),
location: snafu::Location::new(file!(), line!(), column!()),
})?
.build()
.map_err(|e| crate::error::GraphError::PlanError {
message: format!("Failed to build relationship scan: {}", e),
location: snafu::Location::new(file!(), line!(), column!()),
})
}
pub(crate) fn build_qualified_target_scan(
&self,
catalog: &Arc<dyn GraphSourceCatalog>,
target_label: &str,
target_variable: &str,
target_properties: &HashMap<String, PropertyValue>,
) -> Result<LogicalPlan> {
let target_source = catalog.node_source(target_label).ok_or_else(|| {
crate::error::GraphError::ConfigError {
message: format!("No table source found for node label: {}", target_label),
location: snafu::Location::new(file!(), line!(), column!()),
}
})?;
let target_schema = target_source.schema();
let normalized_target_label = target_label.to_lowercase();
let mut target_builder =
LogicalPlanBuilder::scan(&normalized_target_label, target_source, None).map_err(
|e| crate::error::GraphError::PlanError {
message: format!("Failed to scan target node: {}", e),
location: snafu::Location::new(file!(), line!(), column!()),
},
)?;
for (k, v) in target_properties.iter() {
let lit_expr = super::expression::to_df_value_expr(
&crate::ast::ValueExpression::Literal(v.clone()),
);
let filter_expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(col(k)),
op: Operator::Eq,
right: Box::new(lit_expr),
});
target_builder = target_builder.filter(filter_expr).map_err(|e| {
crate::error::GraphError::PlanError {
message: format!("Failed to apply target property filter: {}", e),
location: snafu::Location::new(file!(), line!(), column!()),
}
})?;
}
let target_var_lower = target_variable.to_lowercase();
let target_qualified_exprs: Vec<Expr> = target_schema
.fields()
.iter()
.map(|field| {
let qualified_name = qualify_column(&target_var_lower, field.name());
col(field.name()).alias(&qualified_name)
})
.collect();
target_builder
.project(target_qualified_exprs)
.map_err(|e| crate::error::GraphError::PlanError {
message: format!("Failed to project target node: {}", e),
location: snafu::Location::new(file!(), line!(), column!()),
})?
.build()
.map_err(|e| crate::error::GraphError::PlanError {
message: format!("Failed to build target scan: {}", e),
location: snafu::Location::new(file!(), line!(), column!()),
})
}
}
#[cfg(test)]
mod tests {
use crate::ast::{PropertyRef, PropertyValue, ValueExpression};
use crate::datafusion_planner::{
test_fixtures::{make_catalog, person_config, person_scan},
DataFusionPlanner, GraphPhysicalPlanner,
};
use crate::logical_plan::{LogicalOperator, ProjectionItem};
#[test]
fn test_df_planner_inline_property_filter() {
let mut props = std::collections::HashMap::new();
props.insert(
"name".to_string(),
PropertyValue::String("Alice".to_string()),
);
let mut scan = person_scan("n");
if let LogicalOperator::ScanByLabel { properties, .. } = &mut scan {
*properties = props;
}
let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
let df_plan = planner.plan(&scan).unwrap();
let s = format!("{:?}", df_plan);
assert!(s.contains("Filter"), "plan missing Filter: {}", s);
assert!(s.contains("TableScan"), "plan missing TableScan: {}", s);
assert!(
s.contains("Person") || s.contains("person"),
"plan missing table name: {}",
s
);
}
#[test]
fn test_scan_aliasing_projects_variable_prefixed_columns() {
let scan = person_scan("n");
let project = LogicalOperator::Project {
input: Box::new(scan),
projections: vec![ProjectionItem {
expression: ValueExpression::Property(PropertyRef {
variable: "n".into(),
property: "name".into(),
}),
alias: None,
}],
};
let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
let df_plan = planner.plan(&project).unwrap();
let s = format!("{:?}", df_plan);
assert!(s.contains("Projection"), "plan missing Projection: {}", s);
assert!(
s.contains("n__name"),
"missing qualified projected column n__name: {}",
s
);
}
#[test]
fn test_temp_variable_with_underscores_in_source() {
let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.with_relationship("KNOWS", "src_person_id", "dst_person_id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
let scan = LogicalOperator::ScanByLabel {
variable: "foo_bar".to_string(), label: "Person".to_string(),
properties: Default::default(),
};
let var_expand = LogicalOperator::VariableLengthExpand {
input: Box::new(scan),
source_variable: "foo_bar".to_string(), target_variable: "baz".to_string(),
relationship_types: vec!["KNOWS".to_string()],
direction: crate::ast::RelationshipDirection::Outgoing,
min_length: Some(2),
max_length: Some(2),
relationship_variable: None,
target_properties: Default::default(),
};
let result = planner.plan(&var_expand);
assert!(
result.is_ok(),
"Should handle source variables with underscores: {:?}",
result.err()
);
}
#[test]
fn test_scan_missing_node_label_with_catalog_fails_fast() {
let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
let scan = LogicalOperator::ScanByLabel {
variable: "x".to_string(),
label: "NonExistentLabel".to_string(), properties: Default::default(),
};
let result = planner.plan(&scan);
assert!(
result.is_err(),
"Should fail when catalog exists but label is missing"
);
match result {
Err(crate::error::GraphError::ConfigError { message, .. }) => {
assert!(
message.contains("NonExistentLabel"),
"Error should mention the missing label"
);
assert!(
message.contains("not found"),
"Error should indicate label not found"
);
}
_ => panic!("Expected ConfigError for missing node label"),
}
}
#[test]
fn test_scan_without_catalog_uses_empty_source() {
let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.build()
.unwrap();
let planner = DataFusionPlanner::new(cfg);
let scan = LogicalOperator::ScanByLabel {
variable: "x".to_string(),
label: "AnyLabel".to_string(), properties: Default::default(),
};
let result = planner.plan(&scan);
assert!(
result.is_ok(),
"Should succeed with empty source when no catalog attached"
);
}
#[test]
fn test_expand_with_missing_relationship() {
let cfg = crate::config::GraphConfig::builder()
.with_node_label("Person", "id")
.with_relationship("KNOWS", "src_id", "dst_id")
.build()
.unwrap();
let planner = DataFusionPlanner::with_catalog(cfg, make_catalog());
let scan = LogicalOperator::ScanByLabel {
variable: "a".to_string(),
label: "Person".to_string(),
properties: Default::default(),
};
let expand = LogicalOperator::Expand {
input: Box::new(scan),
source_variable: "a".to_string(),
target_variable: "b".to_string(),
target_label: "Person".to_string(),
relationship_types: vec!["NONEXISTENT_REL".to_string()], direction: crate::ast::RelationshipDirection::Outgoing,
relationship_variable: None,
properties: Default::default(),
target_properties: Default::default(),
};
let result = planner.plan(&expand);
match result {
Ok(_) => {} Err(e) => {
assert!(matches!(e, crate::error::GraphError::PlanError { .. }));
}
}
}
}