hamelin_legacy 0.3.9

Legacy AST translation code for Hamelin (to be deprecated)
Documentation
use std::iter::once;

use crate::ast::pipeline::HamelinPipeline;
use crate::env::Environment;
use crate::translation::projection_builder::{ProjectionBuilder, ProjectionBuilderExt};
use crate::translation::{PendingQuery, PendingStatement};
use hamelin_lib::antlr::hamelinparser::{
    AppendCommandContext, AppendCommandContextAttrs, SelectionContextAttrs,
    TableReferenceContextAttrs,
};
use hamelin_lib::err::{TranslationError, TranslationErrors};
use hamelin_lib::sql::expression::apply::BinaryOperatorApply;
use hamelin_lib::sql::expression::identifier::HamelinIdentifier;
use hamelin_lib::sql::expression::identifier::{CompoundIdentifier, SimpleIdentifier};
use hamelin_lib::sql::expression::literal::ColumnReference;
use hamelin_lib::sql::expression::operator::Operator;
use hamelin_lib::sql::query::dml::{Insert, Merge, MergeInsert, MergeWhenNotMatched, DML};
use hamelin_lib::sql::query::{SQLQuery, TableReference};

pub fn translate(
    ctx: &AppendCommandContext<'static>,
    pipeline: &HamelinPipeline,
    previous: &PendingQuery,
) -> Result<PendingStatement, TranslationErrors> {
    let table = TableReference::new(
        HamelinIdentifier::new(TranslationErrors::expect(
            ctx,
            ctx.tableReference().and_then(|tr| tr.identifier()),
        )?)
        .to_sql()?,
    );
    let table_struct = pipeline
        .context
        .provider
        .reflect_columns(table.clone())
        .map_err(|e| {
            TranslationError::msg(ctx, "could not reflect schema for table")
                .with_source_boxed(e.into())
                .single()
        })?;
    let table_env = Environment::new(table_struct.clone());
    let pb = ProjectionBuilder::deep_initialize_from_environment(&previous.env)
        .expand(table_struct)
        .map_err(|e| TranslationError::wrap(ctx, e).single())?;
    let query = previous.query.replace_projections(
        pb.build_projections()
            .map_err(|e| TranslationError::wrap_box(ctx, e.into()).single())?,
    );
    let dml = create_dml(ctx, table, table_env.clone(), query)?;

    Ok(PendingStatement::new_dml(dml, table_env))
}

fn prefix_column_reference(cr: &ColumnReference, prefix: &str) -> ColumnReference {
    ColumnReference::new(
        CompoundIdentifier::new(
            once(SimpleIdentifier::new(prefix))
                .chain(cr.identifier.simples().into_iter())
                .collect(),
        )
        .into(),
    )
}

fn create_dml(
    ctx: &AppendCommandContext<'static>,
    table: TableReference,
    env: Environment,
    query: SQLQuery,
) -> Result<DML, TranslationErrors> {
    if !ctx.selection_all().is_empty() {
        let distinct_by = ctx
            .selection_all()
            .into_iter()
            .flat_map(|s| s.identifier())
            .map(|i| HamelinIdentifier::new(i))
            .collect::<Vec<_>>();

        env.check_column_references(&distinct_by[..])?;

        let expression = distinct_by
            .into_iter()
            .map(|hr| ColumnReference::new(hr.to_sql().unwrap()))
            .map(|cr| {
                BinaryOperatorApply::new(
                    Operator::Eq,
                    prefix_column_reference(&cr, "source").into(),
                    prefix_column_reference(&cr, "target").into(),
                )
            })
            .reduce(|a, b| BinaryOperatorApply::new(Operator::And, a.into(), b.into()))
            .unwrap(); // this should never happen because the list is not empty

        let res = Merge {
            table_alias: SimpleIdentifier::new("target"),
            query_alias: SimpleIdentifier::new("source"),
            table,
            search_condition: expression.into(),
            when_clauses: vec![MergeWhenNotMatched::new(
                MergeInsert::new(
                    env.fields
                        .fields
                        .keys()
                        .into_iter()
                        .map(|si| ColumnReference::new(si.clone().into()))
                        .collect(),
                )
                .into(),
            )
            .into()],
            query,
        };

        Ok(res.into())
    } else {
        let res = Insert {
            table,
            schema: env.fields,
            query,
        };

        Ok(res.into())
    }
}