use std::iter::once;
use crate::ast::pipeline::HamelinPipeline;
use crate::env::Environment;
use crate::translation::projection_builder::{ProjectionBuilder, ProjectionBuilderExt};
use crate::translation::{PendingQuery, PendingStatement};
use hamelin_lib::antlr::hamelinparser::{
AppendCommandContext, AppendCommandContextAttrs, SelectionContextAttrs,
TableReferenceContextAttrs,
};
use hamelin_lib::err::{TranslationError, TranslationErrors};
use hamelin_lib::sql::expression::apply::BinaryOperatorApply;
use hamelin_lib::sql::expression::identifier::HamelinIdentifier;
use hamelin_lib::sql::expression::identifier::{CompoundIdentifier, SimpleIdentifier};
use hamelin_lib::sql::expression::literal::ColumnReference;
use hamelin_lib::sql::expression::operator::Operator;
use hamelin_lib::sql::query::dml::{Insert, Merge, MergeInsert, MergeWhenNotMatched, DML};
use hamelin_lib::sql::query::{SQLQuery, TableReference};
pub fn translate(
ctx: &AppendCommandContext<'static>,
pipeline: &HamelinPipeline,
previous: &PendingQuery,
) -> Result<PendingStatement, TranslationErrors> {
let table = TableReference::new(
HamelinIdentifier::new(TranslationErrors::expect(
ctx,
ctx.tableReference().and_then(|tr| tr.identifier()),
)?)
.to_sql()?,
);
let table_struct = pipeline
.context
.provider
.reflect_columns(table.clone())
.map_err(|e| {
TranslationError::msg(ctx, "could not reflect schema for table")
.with_source_boxed(e.into())
.single()
})?;
let table_env = Environment::new(table_struct.clone());
let pb = ProjectionBuilder::deep_initialize_from_environment(&previous.env)
.expand(table_struct)
.map_err(|e| TranslationError::wrap(ctx, e).single())?;
let query = previous.query.replace_projections(
pb.build_projections()
.map_err(|e| TranslationError::wrap_box(ctx, e.into()).single())?,
);
let dml = create_dml(ctx, table, table_env.clone(), query)?;
Ok(PendingStatement::new_dml(dml, table_env))
}
fn prefix_column_reference(cr: &ColumnReference, prefix: &str) -> ColumnReference {
ColumnReference::new(
CompoundIdentifier::new(
once(SimpleIdentifier::new(prefix))
.chain(cr.identifier.simples().into_iter())
.collect(),
)
.into(),
)
}
fn create_dml(
ctx: &AppendCommandContext<'static>,
table: TableReference,
env: Environment,
query: SQLQuery,
) -> Result<DML, TranslationErrors> {
if !ctx.selection_all().is_empty() {
let distinct_by = ctx
.selection_all()
.into_iter()
.flat_map(|s| s.identifier())
.map(|i| HamelinIdentifier::new(i))
.collect::<Vec<_>>();
env.check_column_references(&distinct_by[..])?;
let expression = distinct_by
.into_iter()
.map(|hr| ColumnReference::new(hr.to_sql().unwrap()))
.map(|cr| {
BinaryOperatorApply::new(
Operator::Eq,
prefix_column_reference(&cr, "source").into(),
prefix_column_reference(&cr, "target").into(),
)
})
.reduce(|a, b| BinaryOperatorApply::new(Operator::And, a.into(), b.into()))
.unwrap();
let res = Merge {
table_alias: SimpleIdentifier::new("target"),
query_alias: SimpleIdentifier::new("source"),
table,
search_condition: expression.into(),
when_clauses: vec![MergeWhenNotMatched::new(
MergeInsert::new(
env.fields
.fields
.keys()
.into_iter()
.map(|si| ColumnReference::new(si.clone().into()))
.collect(),
)
.into(),
)
.into()],
query,
};
Ok(res.into())
} else {
let res = Insert {
table,
schema: env.fields,
query,
};
Ok(res.into())
}
}