hamelin_legacy 0.3.9

Legacy AST translation code for Hamelin (to be deprecated)
Documentation
use antlr_rust::tree::ParseTree;
use hamelin_lib::sql::expression::operator::Operator;
use regex::Regex;

use crate::ast::expression::HamelinExpression;
use crate::ast::pipeline::HamelinPipeline;
use crate::ast::string::HamelinStringLiteral;
use crate::env::Environment;
use crate::translation::projection_builder::ProjectionBuilder;
use crate::translation::sql_query_helpers::{add_filter_condition, prepend_projections};
use crate::translation::PendingQuery;
use hamelin_lib::antlr::hamelinparser::{ParseCommandContext, ParseCommandContextAttrs};
use hamelin_lib::err::{TranslationError, TranslationErrors};
use hamelin_lib::sql::expression::apply::BinaryOperatorApply;
use hamelin_lib::sql::expression::identifier::HamelinIdentifier;
use hamelin_lib::sql::expression::identifier::{CompoundIdentifier, Identifier};
use hamelin_lib::sql::expression::literal::{ColumnReference, IntegerLiteral, StringLiteral};
use hamelin_lib::sql::expression::regexp::{RegexpCountFunction, RegexpExtractFunction};
use hamelin_lib::sql::expression::{Leaf, SQLExpression};
use hamelin_lib::types::STRING;

pub fn translate(
    ctx: &ParseCommandContext<'static>,
    pipeline: &HamelinPipeline,
    previous: &PendingQuery,
) -> Result<PendingQuery, TranslationErrors> {
    // Figure out the source of parsing.
    // Hold onto the errors os that we can report them along with others, all at once.
    let maybe_source = match &ctx.src {
        Some(src) => HamelinExpression::new(
            src.clone(),
            pipeline
                .context
                .default_expression_translation_context(&previous.env),
        )
        .translate()
        .and_then(|t| {
            if t.typ == STRING {
                Ok(t.sql)
            } else {
                Err(TranslationError::msg(
                    TranslationErrors::expect(ctx, ctx.PARSE_COMMAND())?.as_ref(),
                    "PARSE source must be a string",
                )
                .with_context(src.as_ref(), &format!("found {}", t.typ.to_string()))
                .into())
            }
        }),
        None => {
            let ident: Identifier = CompoundIdentifier::from_two("event", "original").into();

            previous
                .env
                .lookup(&ident)
                .map(|_| ColumnReference::new(ident).into())
                .map_err(|e| {
                    TranslationError::msg(ctx, "cannot determine source column for parsing")
                        .with_source(e)
                        .single()
                })
        }
    };

    // All the columns are extracted through a single regex pattern.
    let maybe_regex_pattern = {
        let pattern_tree = TranslationErrors::expect(ctx, ctx.string())?;

        HamelinStringLiteral::new(pattern_tree.clone())
            .translate()
            .and_then(|t| match t {
                SQLExpression::Leaf(Leaf::StringLiteral(s)) => Ok(s.value),
                _ => TranslationError::msg(
                    pattern_tree.as_ref(),
                    "PARSE pattern must be a string literal",
                )
                .single_result(),
            })
            // Actually translate the anchor style into the regex style.
            .map(|pattern_string| {
                // We need to address the case where the pattern ends in a non-greedy anchor
                // which means it would never match (regex quirk, don't ask me why)
                let pattern_adapted_for_last_character_anchor =
                    if pattern_string.ends_with("*") && !pattern_string.ends_with("**") {
                        pattern_string + "*"
                    } else {
                        pattern_string
                    };

                let special_re_characters = Regex::new(r"[-\[\]{}()+?.,\\^$|#]").unwrap();
                special_re_characters
                    // Escape all characters in the pattern that might have a special meaning in
                    // regex
                    .replace_all(&pattern_adapted_for_last_character_anchor, "\\$0")
                    // Then we turn all `*` anchors into non-greedy matches
                    .replace("*", "(.*?)")
                    // Then we turn all 2 consecutive non-greedies into a greedy
                    .replace("(.*?)(.*?)", "(.*)")
            })
            .and_then(|regex_pattern| {
                // Do some error checking on the anchor pattern -- make sure it matches the number of cols.
                let anchor_count_in_regex =
                    regex_pattern.matches("(.*?)").count() + regex_pattern.matches("(.*)").count();
                if anchor_count_in_regex == 0 {
                    Err(TranslationError::msg(
                        pattern_tree.as_ref(),
                        "anchor pattern must contain at least one *",
                    )
                    .into())
                } else if anchor_count_in_regex != ctx.identifier_all().len() {
                    let mut err = TranslationError::msg(
                        pattern_tree.as_ref(),
                        &format!(
                            "anchor pattern must contain a * for {} new columns",
                            ctx.identifier_all().len()
                        ),
                    );
                    for (i, ident) in ctx.identifier_all().iter().enumerate() {
                        err = err.with_context(ident.as_ref(), &format!("column {}", i));
                    }
                    Err(err.into())
                } else {
                    Ok(format!("(?s){}", regex_pattern))
                }
            })
    };

    let (soruce, regex_pattern) = TranslationErrors::from_2(maybe_source, maybe_regex_pattern)?;

    // And now we just introduce the new columns via the `regexp_extract` SQL
    // function, with the special case of skipping throwaway columns `_`
    let mut new_projection = ProjectionBuilder::default();
    for (i, ident) in ctx.identifier_all().iter().enumerate() {
        let identifier = HamelinIdentifier::new(ident.clone()).to_sql()?;
        let first: Identifier = identifier.first().clone().into();

        if let Ok(t) = previous.env.lookup(&first) {
            if let Ok(struct_type) = t.try_unwrap_struct() {
                if !new_projection.is_present(&first) {
                    new_projection.initialize_key(identifier.first().clone(), struct_type.clone());
                }
            }
        }

        if ident.get_text() != "_" {
            new_projection.bind(
                identifier,
                RegexpExtractFunction::new(
                    soruce.clone(),
                    StringLiteral::new(&regex_pattern),
                    i + 1,
                )
                .into(),
                STRING,
            );
        }
    }

    let mut env = Environment::new(new_projection.clone().build_hamelin_type());

    env = previous.env.clone().prepend_overwrite(&env);

    let mut query = prepend_projections(
        &previous.query,
        new_projection
            .build_projections()
            .map_err(|e| TranslationError::wrap_box(ctx, e.into()))?,
        &env,
    );

    if ctx.NODROP().is_none() {
        query = add_filter_condition(
            &query,
            BinaryOperatorApply::new(
                Operator::Gt,
                RegexpCountFunction::new(soruce, StringLiteral::new(&regex_pattern)).into(),
                IntegerLiteral::new("0").into(),
            )
            .into(),
            &env,
        );
    }

    Ok(PendingQuery::new(query, env))
}