use std::sync::Arc;
use hamelin_lib::err::{ContextualTranslationErrors, Stage};
use hamelin_lib::func::registry::FunctionRegistry;
use hamelin_lib::provider::{EnvironmentProvider, NoOpProvider};
use hamelin_lib::tree::ast::identifier::{CompoundIdentifier, Identifier, SimpleIdentifier};
use hamelin_lib::tree::typed_ast::context::StatementTranslationContext;
use hamelin_lib::tree::typed_ast::query::TypedStatement;
use crate::ir::IRStatement;
use crate::normalize::normalize_statement;
pub fn lower(statement: Arc<TypedStatement>) -> Result<IRStatement, ContextualTranslationErrors> {
NormalizationOptions::default().lower(statement)
}
pub fn normalize_with() -> NormalizationOptions {
NormalizationOptions::default()
}
#[derive(Clone)]
pub struct NormalizationOptions {
registry: Arc<FunctionRegistry>,
provider: Arc<dyn EnvironmentProvider>,
timestamp_field: Identifier,
message_field: Identifier,
lower_transform: bool,
skip_statement_passes: bool,
skip_pipeline_passes: bool,
skip_projection_fusion: bool,
}
impl Default for NormalizationOptions {
fn default() -> Self {
Self {
registry: Arc::new(FunctionRegistry::default()),
provider: Arc::new(NoOpProvider::default()),
timestamp_field: SimpleIdentifier::new("timestamp").into(),
message_field: CompoundIdentifier::new(
SimpleIdentifier::new("event"),
SimpleIdentifier::new("original"),
vec![],
)
.into(),
lower_transform: false,
skip_statement_passes: false,
skip_pipeline_passes: false,
skip_projection_fusion: false,
}
}
}
impl NormalizationOptions {
pub fn with_registry(mut self, registry: Arc<FunctionRegistry>) -> Self {
self.registry = registry;
self
}
pub fn with_provider(mut self, provider: Arc<dyn EnvironmentProvider>) -> Self {
self.provider = provider;
self
}
pub fn with_timestamp_field(mut self, field: impl Into<Identifier>) -> Self {
self.timestamp_field = field.into();
self
}
pub fn with_message_field(mut self, field: impl Into<Identifier>) -> Self {
self.message_field = field.into();
self
}
pub fn with_lower_transform(mut self) -> Self {
self.lower_transform = true;
self
}
pub fn skip_statement_passes(mut self) -> Self {
self.skip_statement_passes = true;
self
}
pub fn skip_pipeline_passes(mut self) -> Self {
self.skip_pipeline_passes = true;
self
}
pub fn skip_projection_fusion(mut self) -> Self {
self.skip_projection_fusion = true;
self
}
fn build_context(&self) -> StatementTranslationContext {
let mut ctx =
StatementTranslationContext::new(self.registry.clone(), self.provider.clone())
.with_timestamp_field(self.timestamp_field.clone())
.with_message_field(self.message_field.clone());
if self.lower_transform {
ctx = ctx.with_lower_transform();
}
if self.skip_statement_passes {
ctx = ctx.with_skip_statement_passes();
}
if self.skip_pipeline_passes {
ctx = ctx.with_skip_pipeline_passes();
}
if self.skip_projection_fusion {
ctx = ctx.with_skip_projection_fusion();
}
ctx
}
pub fn normalize(
self,
statement: Arc<TypedStatement>,
) -> Result<Arc<TypedStatement>, ContextualTranslationErrors> {
let mut ctx = self.build_context();
normalize_statement(statement, &mut ctx)
}
pub fn lower(
self,
statement: Arc<TypedStatement>,
) -> Result<IRStatement, ContextualTranslationErrors> {
let mut ctx = self.build_context();
let normalized = normalize_statement(statement, &mut ctx)?;
let hamelin = normalized.ast.to_string();
IRStatement::from_typed(normalized, &mut ctx).map_err(|e| {
ContextualTranslationErrors::new(
hamelin,
(*e).clone().with_stage(Stage::Lowering).single(),
)
})
}
}