use std::sync::Arc;
use datafusion::common::DFSchema;
use datafusion::logical_expr::{
EmptyRelation, Expr, LogicalPlan as DfPlan, LogicalPlanBuilder, SortExpr as DfSortExpr,
};
use datafusion::prelude::SessionContext;
use oxisql_parse::LogicalPlan as OxiPlan;
use crate::error::OxiSqlFusionError;
pub async fn to_datafusion_plan(
plan: &OxiPlan,
ctx: &SessionContext,
) -> Result<DfPlan, OxiSqlFusionError> {
match plan {
OxiPlan::Scan { table, alias, .. } => {
let sql = match alias {
Some(a) => format!("SELECT * FROM \"{table}\" AS \"{a}\""),
None => format!("SELECT * FROM \"{table}\""),
};
let df = ctx.sql(&sql).await.map_err(OxiSqlFusionError::DataFusion)?;
Ok(df.logical_plan().clone())
}
OxiPlan::Filter { input, predicate } => {
let df_input = Box::pin(to_datafusion_plan(input, ctx)).await?;
let df_schema: &DFSchema = df_input.schema().as_ref();
let expr = ctx.parse_sql_expr(predicate, df_schema).map_err(|_| {
OxiSqlFusionError::UnsupportedType(format!(
"Filter predicate '{}' could not be parsed structurally; \
use sql_to_datafusion_plan with the original SQL instead",
predicate
))
})?;
LogicalPlanBuilder::from(df_input)
.filter(expr)
.map_err(OxiSqlFusionError::DataFusion)?
.build()
.map_err(OxiSqlFusionError::DataFusion)
}
OxiPlan::Project { input, columns } => {
if columns.iter().any(|c| c.trim() == "*") {
return Err(OxiSqlFusionError::UnsupportedType(
"Project with wildcard '*' cannot be lowered structurally; \
use sql_to_datafusion_plan with the original SQL instead"
.to_string(),
));
}
let df_input = Box::pin(to_datafusion_plan(input, ctx)).await?;
let df_schema: &DFSchema = df_input.schema().as_ref();
let exprs: Vec<Expr> = columns
.iter()
.map(|col_expr| {
ctx.parse_sql_expr(col_expr, df_schema).map_err(|_| {
OxiSqlFusionError::UnsupportedType(format!(
"Project column expression '{}' could not be parsed structurally; \
use sql_to_datafusion_plan with the original SQL instead",
col_expr
))
})
})
.collect::<Result<Vec<Expr>, OxiSqlFusionError>>()?;
LogicalPlanBuilder::from(df_input)
.project(exprs)
.map_err(OxiSqlFusionError::DataFusion)?
.build()
.map_err(OxiSqlFusionError::DataFusion)
}
OxiPlan::Sort { input, order_by } => {
let df_input = Box::pin(to_datafusion_plan(input, ctx)).await?;
let df_schema: &DFSchema = df_input.schema().as_ref();
let mut sort_exprs: Vec<DfSortExpr> = Vec::with_capacity(order_by.len());
for se in order_by {
let base_expr =
ctx.parse_sql_expr(se.column.as_str(), df_schema)
.map_err(|_| {
OxiSqlFusionError::UnsupportedType(format!(
"Sort key '{}' could not be parsed structurally; \
use sql_to_datafusion_plan with the original SQL instead",
se.column
))
})?;
sort_exprs.push(DfSortExpr::new(base_expr, se.ascending, true));
}
LogicalPlanBuilder::from(df_input)
.sort(sort_exprs)
.map_err(OxiSqlFusionError::DataFusion)?
.build()
.map_err(OxiSqlFusionError::DataFusion)
}
OxiPlan::Limit {
count,
offset,
input,
} => {
let inner = Box::pin(to_datafusion_plan(input, ctx)).await?;
let skip = offset.unwrap_or(0) as usize;
let fetch = count.map(|c| c as usize);
LogicalPlanBuilder::from(inner)
.limit(skip, fetch)
.map_err(OxiSqlFusionError::DataFusion)?
.build()
.map_err(OxiSqlFusionError::DataFusion)
}
OxiPlan::Empty => {
let empty_schema = Arc::new(DFSchema::empty());
Ok(DfPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: empty_schema,
}))
}
other => Err(OxiSqlFusionError::UnsupportedType(format!(
"plan variant '{}' is not directly convertible; use sql_to_datafusion_plan \
with the original SQL string instead",
plan_variant_name(other)
))),
}
}
pub async fn sql_to_datafusion_plan(
sql: &str,
ctx: &SessionContext,
) -> Result<DfPlan, OxiSqlFusionError> {
ctx.sql(sql)
.await
.map_err(OxiSqlFusionError::DataFusion)
.map(|df| df.logical_plan().clone())
}
fn plan_variant_name(plan: &OxiPlan) -> &'static str {
match plan {
OxiPlan::Scan { .. } => "Scan",
OxiPlan::Filter { .. } => "Filter",
OxiPlan::Project { .. } => "Project",
OxiPlan::Join { .. } => "Join",
OxiPlan::Aggregate { .. } => "Aggregate",
OxiPlan::Sort { .. } => "Sort",
OxiPlan::Limit { .. } => "Limit",
OxiPlan::Values { .. } => "Values",
OxiPlan::Empty => "Empty",
OxiPlan::SetOp { .. } => "SetOp",
OxiPlan::Cte { .. } => "Cte",
OxiPlan::CteRef { .. } => "CteRef",
OxiPlan::Window { .. } => "Window",
OxiPlan::Subquery { .. } => "Subquery",
OxiPlan::Exists { .. } => "Exists",
OxiPlan::InSubquery { .. } => "InSubquery",
OxiPlan::Compute { .. } => "Compute",
}
}