use std::sync::Arc;
use datafusion::common::DFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan as DfPlan, LogicalPlanBuilder};
use datafusion::prelude::SessionContext;
use oxisql_parse::LogicalPlan as OxiPlan;
use crate::error::OxiSqlFusionError;
pub async fn to_datafusion_plan(
plan: &OxiPlan,
ctx: &SessionContext,
) -> Result<DfPlan, OxiSqlFusionError> {
match plan {
OxiPlan::Scan { table, alias, .. } => {
let sql = match alias {
Some(a) => format!("SELECT * FROM \"{table}\" AS \"{a}\""),
None => format!("SELECT * FROM \"{table}\""),
};
let df = ctx.sql(&sql).await.map_err(OxiSqlFusionError::DataFusion)?;
Ok(df.logical_plan().clone())
}
OxiPlan::Limit {
count,
offset,
input,
} => {
let inner = Box::pin(to_datafusion_plan(input, ctx)).await?;
let skip = offset.unwrap_or(0) as usize;
let fetch = count.map(|c| c as usize);
LogicalPlanBuilder::from(inner)
.limit(skip, fetch)
.map_err(OxiSqlFusionError::DataFusion)?
.build()
.map_err(OxiSqlFusionError::DataFusion)
}
OxiPlan::Empty => {
let empty_schema = Arc::new(DFSchema::empty());
Ok(DfPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: empty_schema,
}))
}
other => Err(OxiSqlFusionError::UnsupportedType(format!(
"plan variant '{}' is not directly convertible; use sql_to_datafusion_plan \
with the original SQL string instead",
plan_variant_name(other)
))),
}
}
pub async fn sql_to_datafusion_plan(
sql: &str,
ctx: &SessionContext,
) -> Result<DfPlan, OxiSqlFusionError> {
ctx.sql(sql)
.await
.map_err(OxiSqlFusionError::DataFusion)
.map(|df| df.logical_plan().clone())
}
fn plan_variant_name(plan: &OxiPlan) -> &'static str {
match plan {
OxiPlan::Scan { .. } => "Scan",
OxiPlan::Filter { .. } => "Filter",
OxiPlan::Project { .. } => "Project",
OxiPlan::Join { .. } => "Join",
OxiPlan::Aggregate { .. } => "Aggregate",
OxiPlan::Sort { .. } => "Sort",
OxiPlan::Limit { .. } => "Limit",
OxiPlan::Values { .. } => "Values",
OxiPlan::Empty => "Empty",
OxiPlan::SetOp { .. } => "SetOp",
OxiPlan::Cte { .. } => "Cte",
OxiPlan::CteRef { .. } => "CteRef",
OxiPlan::Window { .. } => "Window",
OxiPlan::Subquery { .. } => "Subquery",
OxiPlan::Exists { .. } => "Exists",
OxiPlan::InSubquery { .. } => "InSubquery",
}
}