use antlr_rust::tree::ParseTree;
use hamelin_lib::sql::expression::operator::Operator;
use regex::Regex;
use crate::ast::expression::HamelinExpression;
use crate::ast::pipeline::HamelinPipeline;
use crate::ast::string::HamelinStringLiteral;
use crate::env::Environment;
use crate::translation::projection_builder::ProjectionBuilder;
use crate::translation::sql_query_helpers::{add_filter_condition, prepend_projections};
use crate::translation::PendingQuery;
use hamelin_lib::antlr::hamelinparser::{ParseCommandContext, ParseCommandContextAttrs};
use hamelin_lib::err::{TranslationError, TranslationErrors};
use hamelin_lib::sql::expression::apply::BinaryOperatorApply;
use hamelin_lib::sql::expression::identifier::HamelinIdentifier;
use hamelin_lib::sql::expression::identifier::{CompoundIdentifier, Identifier};
use hamelin_lib::sql::expression::literal::{ColumnReference, IntegerLiteral, StringLiteral};
use hamelin_lib::sql::expression::regexp::{RegexpCountFunction, RegexpExtractFunction};
use hamelin_lib::sql::expression::{Leaf, SQLExpression};
use hamelin_lib::types::STRING;
pub fn translate(
ctx: &ParseCommandContext<'static>,
pipeline: &HamelinPipeline,
previous: &PendingQuery,
) -> Result<PendingQuery, TranslationErrors> {
let maybe_source = match &ctx.src {
Some(src) => HamelinExpression::new(
src.clone(),
pipeline
.context
.default_expression_translation_context(&previous.env),
)
.translate()
.and_then(|t| {
if t.typ == STRING {
Ok(t.sql)
} else {
Err(TranslationError::msg(
TranslationErrors::expect(ctx, ctx.PARSE_COMMAND())?.as_ref(),
"PARSE source must be a string",
)
.with_context(src.as_ref(), &format!("found {}", t.typ.to_string()))
.into())
}
}),
None => {
let ident: Identifier = CompoundIdentifier::from_two("event", "original").into();
previous
.env
.lookup(&ident)
.map(|_| ColumnReference::new(ident).into())
.map_err(|e| {
TranslationError::msg(ctx, "cannot determine source column for parsing")
.with_source(e)
.single()
})
}
};
let maybe_regex_pattern = {
let pattern_tree = TranslationErrors::expect(ctx, ctx.string())?;
HamelinStringLiteral::new(pattern_tree.clone())
.translate()
.and_then(|t| match t {
SQLExpression::Leaf(Leaf::StringLiteral(s)) => Ok(s.value),
_ => TranslationError::msg(
pattern_tree.as_ref(),
"PARSE pattern must be a string literal",
)
.single_result(),
})
.map(|pattern_string| {
let pattern_adapted_for_last_character_anchor =
if pattern_string.ends_with("*") && !pattern_string.ends_with("**") {
pattern_string + "*"
} else {
pattern_string
};
let special_re_characters = Regex::new(r"[-\[\]{}()+?.,\\^$|#]").unwrap();
special_re_characters
.replace_all(&pattern_adapted_for_last_character_anchor, "\\$0")
.replace("*", "(.*?)")
.replace("(.*?)(.*?)", "(.*)")
})
.and_then(|regex_pattern| {
let anchor_count_in_regex =
regex_pattern.matches("(.*?)").count() + regex_pattern.matches("(.*)").count();
if anchor_count_in_regex == 0 {
Err(TranslationError::msg(
pattern_tree.as_ref(),
"anchor pattern must contain at least one *",
)
.into())
} else if anchor_count_in_regex != ctx.identifier_all().len() {
let mut err = TranslationError::msg(
pattern_tree.as_ref(),
&format!(
"anchor pattern must contain a * for {} new columns",
ctx.identifier_all().len()
),
);
for (i, ident) in ctx.identifier_all().iter().enumerate() {
err = err.with_context(ident.as_ref(), &format!("column {}", i));
}
Err(err.into())
} else {
Ok(format!("(?s){}", regex_pattern))
}
})
};
let (soruce, regex_pattern) = TranslationErrors::from_2(maybe_source, maybe_regex_pattern)?;
let mut new_projection = ProjectionBuilder::default();
for (i, ident) in ctx.identifier_all().iter().enumerate() {
let identifier = HamelinIdentifier::new(ident.clone()).to_sql()?;
let first: Identifier = identifier.first().clone().into();
if let Ok(t) = previous.env.lookup(&first) {
if let Ok(struct_type) = t.try_unwrap_struct() {
if !new_projection.is_present(&first) {
new_projection.initialize_key(identifier.first().clone(), struct_type.clone());
}
}
}
if ident.get_text() != "_" {
new_projection.bind(
identifier,
RegexpExtractFunction::new(
soruce.clone(),
StringLiteral::new(®ex_pattern),
i + 1,
)
.into(),
STRING,
);
}
}
let mut env = Environment::new(new_projection.clone().build_hamelin_type());
env = previous.env.clone().prepend_overwrite(&env);
let mut query = prepend_projections(
&previous.query,
new_projection
.build_projections()
.map_err(|e| TranslationError::wrap_box(ctx, e.into()))?,
&env,
);
if ctx.NODROP().is_none() {
query = add_filter_condition(
&query,
BinaryOperatorApply::new(
Operator::Gt,
RegexpCountFunction::new(soruce, StringLiteral::new(®ex_pattern)).into(),
IntegerLiteral::new("0").into(),
)
.into(),
&env,
);
}
Ok(PendingQuery::new(query, env))
}