use std::sync::Arc;
use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::{
ast::command::Command,
builder::{self, call, gt, let_command, string, where_command, IntoExpressionBuilder},
typed_ast::{
command::{TypedCommand, TypedCommandKind, TypedParseCommand},
context::StatementTranslationContext,
pipeline::TypedPipeline,
},
};
pub fn lower_parse(
pipeline: Arc<TypedPipeline>,
ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedPipeline>, Arc<TranslationError>> {
if !pipeline
.valid_ref()?
.commands
.iter()
.any(|cmd| matches!(&cmd.kind, TypedCommandKind::Parse(_)))
{
return Ok(pipeline);
}
let valid = pipeline.valid_ref()?;
let mut pipe_builder = builder::pipeline();
for cmd in &valid.commands {
for c in lower_command(cmd, ctx)? {
pipe_builder = pipe_builder.command(c);
}
}
let new_ast = pipe_builder.build().at(pipeline.ast.span);
Ok(Arc::new(TypedPipeline::from_ast_with_context(
Arc::new(new_ast),
ctx,
)))
}
fn lower_command(
cmd: &Arc<TypedCommand>,
ctx: &mut StatementTranslationContext,
) -> Result<Vec<Command>, Arc<TranslationError>> {
let TypedCommandKind::Parse(parse_cmd) = &cmd.kind else {
return Ok(vec![cmd.ast.as_ref().clone()]);
};
lower_parse_command(parse_cmd, cmd, ctx)
}
fn lower_parse_command(
parse_cmd: &TypedParseCommand,
cmd: &TypedCommand,
ctx: &mut StatementTranslationContext,
) -> Result<Vec<Command>, Arc<TranslationError>> {
let regex_pattern = anchor_pattern_to_regex(&parse_cmd.pattern);
let source_expr = match &parse_cmd.source {
Some(expr) => expr.ast.as_ref().clone(),
None => ctx.message_field.clone().into_expression_builder().build(),
};
let mut let_builder = let_command().at(cmd.ast.span);
let mut has_fields = false;
for (i, id) in parse_cmd.identifiers.iter().enumerate() {
let field_id = id.clone().valid()?;
if field_id.to_string() == "_" {
continue;
}
let group_num = (i + 1) as i64;
let extract_expr = call("regexp_extract")
.arg(source_expr.clone())
.arg(string(®ex_pattern))
.arg(group_num);
let_builder = let_builder.named_field(field_id, extract_expr);
has_fields = true;
}
if !has_fields {
if parse_cmd.nodrop {
return Ok(vec![]);
}
let where_cmd = where_command(gt(
call("regexp_count")
.arg(source_expr)
.arg(string(®ex_pattern)),
0,
))
.at(cmd.ast.span)
.build();
return Ok(vec![where_cmd]);
}
let let_cmd = let_builder.build();
if parse_cmd.nodrop {
return Ok(vec![let_cmd]);
}
let where_cmd = where_command(gt(
call("regexp_count")
.arg(source_expr)
.arg(string(®ex_pattern)),
0,
))
.at(cmd.ast.span)
.build();
Ok(vec![let_cmd, where_cmd])
}
fn anchor_pattern_to_regex(pattern: &str) -> String {
let pattern = if pattern.ends_with('*') && !pattern.ends_with("**") {
format!("{}*", pattern)
} else {
pattern.to_string()
};
let mut escaped = String::new();
for c in pattern.chars() {
match c {
'-' | '[' | ']' | '{' | '}' | '(' | ')' | '+' | '?' | '.' | ',' | '\\' | '^' | '$'
| '|' | '#' => {
escaped.push('\\');
escaped.push(c);
}
_ => escaped.push(c),
}
}
let with_captures = escaped.replace('*', "(.*?)");
let collapsed = with_captures.replace("(.*?)(.*?)", "(.*)");
format!("(?s){}", collapsed)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_anchor_pattern_to_regex_simple() {
assert_eq!(
anchor_pattern_to_regex("prefix-*-suffix"),
"(?s)prefix\\-(.*?)\\-suffix"
);
}
#[test]
fn test_anchor_pattern_to_regex_multiple() {
assert_eq!(
anchor_pattern_to_regex("a-*-b-*-c"),
"(?s)a\\-(.*?)\\-b\\-(.*?)\\-c"
);
}
#[test]
fn test_anchor_pattern_to_regex_trailing_star() {
assert_eq!(anchor_pattern_to_regex("prefix-*"), "(?s)prefix\\-(.*)");
}
#[test]
fn test_anchor_pattern_to_regex_escapes_metacharacters() {
assert_eq!(
anchor_pattern_to_regex("user.name=*"),
"(?s)user\\.name=(.*)"
);
}
#[test]
fn test_anchor_pattern_to_regex_no_wildcards() {
assert_eq!(anchor_pattern_to_regex("literal"), "(?s)literal");
}
#[test]
fn test_consecutive_captures_collapse() {
assert_eq!(anchor_pattern_to_regex("**"), "(?s)(.*)");
}
use hamelin_lib::type_check;
use hamelin_lib::{
tree::{
ast::pipeline::Pipeline,
builder::{
call, field_ref, gt, let_command, parse_command, pipeline, select_command, string,
where_command,
},
},
types::{struct_type::Struct, INT, STRING},
};
use pretty_assertions::assert_eq;
use rstest::rstest;
use std::sync::Arc;
#[rstest]
#[case::no_parse_passthrough(
pipeline()
.command(select_command().named_field("a", 1).named_field("b", 2).build())
.build(),
pipeline()
.command(select_command().named_field("a", 1).named_field("b", 2).build())
.build(),
Struct::default().with_str("a", INT).with_str("b", INT)
)]
#[case::parse_with_source(
pipeline()
.command(select_command().named_field("msg", "prefix-hello-suffix").build())
.command(parse_command().pattern("prefix-*-suffix").identifier("value").source(field_ref("msg")).build())
.build(),
pipeline()
.command(select_command().named_field("msg", "prefix-hello-suffix").build())
.command(let_command()
.named_field("value", call("regexp_extract")
.arg(field_ref("msg"))
.arg(string("(?s)prefix\\-(.*?)\\-suffix"))
.arg(1))
.build())
.command(where_command(gt(
call("regexp_count")
.arg(field_ref("msg"))
.arg(string("(?s)prefix\\-(.*?)\\-suffix")),
0
)).build())
.build(),
Struct::default().with_str("value", STRING).with_str("msg", STRING)
)]
#[case::parse_with_nodrop(
pipeline()
.command(select_command().named_field("msg", "user=test").build())
.command(parse_command().pattern("user=*").identifier("user").source(field_ref("msg")).nodrop(true).build())
.build(),
pipeline()
.command(select_command().named_field("msg", "user=test").build())
.command(let_command()
.named_field("user", call("regexp_extract")
.arg(field_ref("msg"))
.arg(string("(?s)user=(.*)"))
.arg(1))
.build())
.build(),
Struct::default().with_str("user", STRING).with_str("msg", STRING)
)]
#[case::parse_multiple_captures(
pipeline()
.command(select_command().named_field("msg", "a-val1-b-val2-c").build())
.command(parse_command().pattern("a-*-b-*-c").identifier("x").identifier("y").source(field_ref("msg")).build())
.build(),
pipeline()
.command(select_command().named_field("msg", "a-val1-b-val2-c").build())
.command(let_command()
.named_field("x", call("regexp_extract")
.arg(field_ref("msg"))
.arg(string("(?s)a\\-(.*?)\\-b\\-(.*?)\\-c"))
.arg(1))
.named_field("y", call("regexp_extract")
.arg(field_ref("msg"))
.arg(string("(?s)a\\-(.*?)\\-b\\-(.*?)\\-c"))
.arg(2))
.build())
.command(where_command(gt(
call("regexp_count")
.arg(field_ref("msg"))
.arg(string("(?s)a\\-(.*?)\\-b\\-(.*?)\\-c")),
0
)).build())
.build(),
Struct::default().with_str("x", STRING).with_str("y", STRING).with_str("msg", STRING)
)]
#[case::parse_throwaway_column(
pipeline()
.command(select_command().named_field("msg", "skip-keep-end").build())
.command(parse_command().pattern("*-*-end").identifier("_").identifier("val").source(field_ref("msg")).build())
.build(),
pipeline()
.command(select_command().named_field("msg", "skip-keep-end").build())
.command(let_command()
.named_field("val", call("regexp_extract")
.arg(field_ref("msg"))
.arg(string("(?s)(.*?)\\-(.*?)\\-end"))
.arg(2))
.build())
.command(where_command(gt(
call("regexp_count")
.arg(field_ref("msg"))
.arg(string("(?s)(.*?)\\-(.*?)\\-end")),
0
)).build())
.build(),
Struct::default().with_str("val", STRING).with_str("msg", STRING)
)]
#[case::parse_all_throwaway(
pipeline()
.command(select_command().named_field("msg", "prefix-data-suffix").build())
.command(parse_command().pattern("prefix-*-suffix").identifier("_").source(field_ref("msg")).build())
.build(),
pipeline()
.command(select_command().named_field("msg", "prefix-data-suffix").build())
.command(where_command(gt(
call("regexp_count")
.arg(field_ref("msg"))
.arg(string("(?s)prefix\\-(.*?)\\-suffix")),
0
)).build())
.build(),
Struct::default().with_str("msg", STRING)
)]
#[case::parse_all_throwaway_nodrop(
pipeline()
.command(select_command().named_field("msg", "prefix-data-suffix").build())
.command(parse_command().pattern("prefix-*-suffix").identifier("_").source(field_ref("msg")).nodrop(true).build())
.build(),
pipeline()
.command(select_command().named_field("msg", "prefix-data-suffix").build())
.build(),
Struct::default().with_str("msg", STRING)
)]
fn test_lower_parse(
#[case] input: Pipeline,
#[case] expected: Pipeline,
#[case] expected_output_schema: Struct,
) {
let input_typed = type_check(input).output;
let expected_typed = type_check(expected).output;
let mut ctx = StatementTranslationContext::default();
let result = lower_parse(Arc::new(input_typed), &mut ctx).unwrap();
assert_eq!(result.ast, expected_typed.ast);
let result_schema = result.environment().as_struct().clone();
assert_eq!(result_schema, expected_output_schema);
}
}