use std::collections::HashMap;
use std::sync::Arc;
use datafusion::common::DFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::ast::identifier::Identifier;
use hamelin_translation::{IRCommand, IRCommandKind, IRPipeline};
use crate::command::{
translate_agg_command, translate_explode_command, translate_from_command,
translate_join_command, translate_limit_command, translate_select_command,
translate_sort_command, translate_where_command, translate_window_command,
};
use crate::expr::ExprTranslationContext;
fn empty_single_row_plan() -> LogicalPlan {
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema: Arc::new(DFSchema::empty()),
})
}
pub async fn translate_pipeline_with_ctes(
pipeline: &IRPipeline,
ctx: &SessionContext,
ctes: &HashMap<Identifier, Arc<LogicalPlan>>,
expr_ctx: &ExprTranslationContext,
) -> Result<LogicalPlan, Arc<TranslationError>> {
let commands = &pipeline.commands;
if commands.is_empty() {
return Err(TranslationError::msg(pipeline, "Empty pipeline produces no plan").into());
}
let mut current_plan = empty_single_row_plan();
for command in commands {
current_plan = translate_command(command, current_plan, ctx, ctes, expr_ctx).await?;
}
Ok(current_plan)
}
async fn translate_command(
command: &IRCommand,
input: LogicalPlan,
ctx: &SessionContext,
ctes: &HashMap<Identifier, Arc<LogicalPlan>>,
expr_ctx: &ExprTranslationContext,
) -> Result<LogicalPlan, Arc<TranslationError>> {
match &command.kind {
IRCommandKind::From(from_cmd) => {
translate_from_command(
from_cmd,
ctx,
ctes,
&command.output_schema,
command,
expr_ctx,
)
.await
}
IRCommandKind::Where(where_cmd) => {
translate_where_command(where_cmd, input, command, expr_ctx)
}
IRCommandKind::Select(select_cmd) => {
translate_select_command(select_cmd, input, command, expr_ctx)
}
IRCommandKind::Agg(agg_cmd) => translate_agg_command(agg_cmd, input, command, expr_ctx),
IRCommandKind::Window(window_cmd) => {
translate_window_command(window_cmd, input, &command.output_schema, command, expr_ctx)
}
IRCommandKind::Sort(sort_cmd) => translate_sort_command(sort_cmd, input, command, expr_ctx),
IRCommandKind::Limit(limit_cmd) => {
translate_limit_command(limit_cmd, input, command, expr_ctx)
}
IRCommandKind::Explode(explode_cmd) => translate_explode_command(
explode_cmd,
input,
&command.output_schema,
command,
expr_ctx,
),
IRCommandKind::Join(join_cmd) => {
translate_join_command(join_cmd, input, ctx, ctes, command, expr_ctx).await
}
}
}