use std::cell::RefCell;
use std::collections::HashMap;
use proc_macro2::{Ident, Span, TokenStream};
use quote::{format_ident, quote};
use syn::Index;
use crate::parser::{ArithmeticOperator, BuiltinOperator, ComparisonOperator, ConstType, DataType};
use crate::planner::{
ArithmeticArgument, ComparisonExprArgument, Constraints, FactorArgument,
FnCallPredicateArgument, TransformationArgument,
};
use crate::codegen::CodeGen;
use crate::codegen::CodegenError;
use crate::codegen::tuple_tokens;
pub(super) fn row_pattern_and_fields(
arity: usize,
key_args: &[ArithmeticArgument],
value_args: &[ArithmeticArgument],
compares: &[ComparisonExprArgument],
fn_call_preds: &[FnCallPredicateArgument],
constraints: &Constraints,
) -> (TokenStream, Vec<Ident>) {
if arity == 0 {
return (quote! { () }, Vec::new());
}
let used = compute_row_params_tokens(
arity,
key_args,
value_args,
compares,
fn_call_preds,
constraints,
);
let fields: Vec<Ident> = (0..arity)
.map(|idx| {
if used[idx] {
format_ident!("x{}", idx)
} else {
format_ident!("_x{}", idx)
}
})
.collect();
let pat = tuple_tokens(fields.iter().map(|f| quote! { #f }));
(pat, fields)
}
pub(super) fn row_use_counts(arg_lists: &[&[ArithmeticArgument]]) -> HashMap<usize, usize> {
arg_lists
.iter()
.flat_map(|args| args.iter())
.flat_map(|arg| arg.transformation_arguments())
.filter_map(|ta| match ta {
TransformationArgument::KV((_, idx)) => Some(*idx),
_ => None,
})
.fold(HashMap::new(), |mut acc, idx| {
*acc.entry(idx).or_insert(0) += 1;
acc
})
}
pub(super) fn kv_use_counts(arg_lists: &[&[ArithmeticArgument]]) -> HashMap<(bool, usize), usize> {
arg_lists
.iter()
.flat_map(|args| args.iter())
.flat_map(|arg| arg.transformation_arguments())
.map(|ta| match ta {
TransformationArgument::KV((is_key, idx))
| TransformationArgument::Jn((_, is_key, idx)) => (*is_key, *idx),
})
.fold(HashMap::new(), |mut acc, key| {
*acc.entry(key).or_insert(0) += 1;
acc
})
}
impl CodeGen {
pub(super) fn build_key_val_from_row_args(
&mut self,
args: &[ArithmeticArgument],
fields: &[Ident],
string_intern: bool,
remaining: Option<&RefCell<HashMap<usize, usize>>>,
) -> Result<TokenStream, CodegenError> {
let parts: Vec<TokenStream> = args
.iter()
.map(|arg| self.build_row_args_arithmetic_expr(arg, fields, string_intern, remaining))
.collect::<Result<_, _>>()?;
Ok(tuple_tokens(parts))
}
pub(super) fn build_key_val_from_kv_args(
&mut self,
args: &[ArithmeticArgument],
string_intern: bool,
remaining: Option<&RefCell<HashMap<(bool, usize), usize>>>,
) -> Result<TokenStream, CodegenError> {
let parts: Vec<TokenStream> = args
.iter()
.map(|a| self.build_kv_args_arithmetic_expr(a, string_intern, remaining))
.collect::<Result<_, _>>()?;
Ok(tuple_tokens(parts))
}
pub(super) fn build_key_val_from_join_args(
&mut self,
args: &[ArithmeticArgument],
string_intern: bool,
) -> Result<TokenStream, CodegenError> {
let parts: Vec<TokenStream> = args
.iter()
.map(|a| self.build_join_args_arithmetic_expr(a, string_intern))
.collect::<Result<_, _>>()?;
Ok(tuple_tokens(parts))
}
}
fn compute_row_params_tokens(
arity: usize,
key_args: &[ArithmeticArgument],
value_args: &[ArithmeticArgument],
compares: &[ComparisonExprArgument],
fn_call_preds: &[FnCallPredicateArgument],
constraints: &Constraints,
) -> Vec<bool> {
let mut used = vec![false; arity];
let mut mark = |arg: &TransformationArgument| {
if let TransformationArgument::KV((_, idx)) = arg
&& let Some(slot) = used.get_mut(*idx)
{
*slot = true;
}
};
let mut inspect = |expr: &ArithmeticArgument| {
for trans_arg in expr.transformation_arguments() {
mark(trans_arg);
}
};
for expr in key_args.iter().chain(value_args.iter()) {
inspect(expr);
}
for cmp in compares {
inspect(cmp.left());
inspect(cmp.right());
}
for fc in fn_call_preds {
for arg in fc.args() {
inspect(arg);
}
}
for (arg, _) in constraints.constant_eq_constraints().as_ref().iter() {
mark(arg);
}
for (left, right) in constraints.variable_eq_constraints().as_ref().iter() {
mark(left);
mark(right);
}
used
}
fn param_ident(used: bool, name: &str) -> TokenStream {
let id = format_ident!("{}{}", if used { "" } else { "_" }, name);
quote! { #id }
}
pub(super) fn compute_join_param_tokens(
key_args: &[ArithmeticArgument],
value_args: &[ArithmeticArgument],
compares: &[ComparisonExprArgument],
fn_call_preds: &[FnCallPredicateArgument],
) -> (TokenStream, TokenStream, TokenStream) {
let (mut use_k, mut use_lv, mut use_rv) = (false, false, false);
let mut mark = |arg: &TransformationArgument| {
if let TransformationArgument::Jn((is_left, is_key, _)) = arg {
if *is_key {
use_k = true;
} else if *is_left {
use_lv = true;
} else {
use_rv = true;
}
}
};
let mut inspect = |expr: &ArithmeticArgument| {
for trans_arg in expr.transformation_arguments() {
mark(trans_arg);
}
};
for a in key_args.iter().chain(value_args.iter()) {
inspect(a);
}
for cmp in compares {
inspect(cmp.left());
inspect(cmp.right());
}
for fc in fn_call_preds {
for arg in fc.args() {
inspect(arg);
}
}
(
param_ident(use_k, "k"),
param_ident(use_lv, "lv"),
param_ident(use_rv, "rv"),
)
}
pub(super) fn compute_kv_param_tokens(
key_args: &[ArithmeticArgument],
value_args: &[ArithmeticArgument],
compares: &[ComparisonExprArgument],
fn_call_preds: &[FnCallPredicateArgument],
constraints: Option<&Constraints>,
) -> (TokenStream, TokenStream) {
let (mut use_k, mut use_v) = (false, false);
let mut mark = |arg: &TransformationArgument| {
let is_key = match arg {
TransformationArgument::KV((is_key, _))
| TransformationArgument::Jn((_, is_key, _)) => *is_key,
};
if is_key {
use_k = true;
} else {
use_v = true;
}
};
let mut inspect = |expr: &ArithmeticArgument| {
for trans_arg in expr.transformation_arguments() {
mark(trans_arg);
}
};
for arg in key_args.iter().chain(value_args.iter()) {
inspect(arg);
}
for cmp in compares {
inspect(cmp.left());
inspect(cmp.right());
}
for fc in fn_call_preds {
for arg in fc.args() {
inspect(arg);
}
}
if let Some(cons) = constraints {
for (arg, _) in cons.constant_eq_constraints().as_ref().iter() {
mark(arg);
}
for (left, right) in cons.variable_eq_constraints().as_ref().iter() {
mark(left);
mark(right);
}
}
(param_ident(use_k, "k"), param_ident(use_v, "v"))
}
fn comparison_op_tokens(op: &ComparisonOperator) -> TokenStream {
match op {
ComparisonOperator::Equal => quote! { == },
ComparisonOperator::NotEqual => quote! { != },
ComparisonOperator::GreaterThan => quote! { > },
ComparisonOperator::GreaterEqualThan => quote! { >= },
ComparisonOperator::LessThan => quote! { < },
ComparisonOperator::LessEqualThan => quote! { <= },
}
}
impl CodeGen {
pub(super) fn build_kv_compare_predicate(
&mut self,
comps: &[ComparisonExprArgument],
string_intern: bool,
input_type: &(Vec<DataType>, Vec<DataType>),
) -> Result<Option<TokenStream>, CodegenError> {
if comps.is_empty() {
return Ok(None);
}
let parts: Vec<TokenStream> = comps
.iter()
.map(|c| {
let l = self.build_kv_args_arithmetic_expr(c.left(), string_intern, None)?;
let r = self.build_kv_args_arithmetic_expr(c.right(), string_intern, None)?;
let op = comparison_op_tokens(c.operator());
Ok(
if string_intern
&& c.operator().is_inequality()
&& self.infer_expr_type(c.left(), input_type, None)? == DataType::String
&& self.infer_expr_type(c.right(), input_type, None)? == DataType::String
{
self.features.mark_string_resolve();
quote! { resolve(#l) #op resolve(#r) }
} else {
quote! { (#l) #op (#r) }
},
)
})
.collect::<Result<_, CodegenError>>()?;
Ok(Some(quote! { #( #parts )&&* }))
}
pub(super) fn build_join_compare_predicate(
&mut self,
comps: &[ComparisonExprArgument],
string_intern: bool,
left_type: &(Vec<DataType>, Vec<DataType>),
right_type: &(Vec<DataType>, Vec<DataType>),
) -> Result<Option<TokenStream>, CodegenError> {
if comps.is_empty() {
return Ok(None);
}
let parts: Vec<TokenStream> = comps
.iter()
.map(|c| {
let l = self.build_join_args_arithmetic_expr(c.left(), string_intern)?;
let r = self.build_join_args_arithmetic_expr(c.right(), string_intern)?;
let op = comparison_op_tokens(c.operator());
Ok(
if string_intern
&& c.operator().is_inequality()
&& self.infer_expr_type(c.left(), left_type, Some(right_type))?
== DataType::String
&& self.infer_expr_type(c.right(), left_type, Some(right_type))?
== DataType::String
{
self.features.mark_string_resolve();
quote! { resolve(#l) #op resolve(#r) }
} else {
quote! { (#l) #op (#r) }
},
)
})
.collect::<Result<_, CodegenError>>()?;
Ok(Some(quote! { #( #parts )&&* }))
}
pub(super) fn build_row_compare_predicate(
&mut self,
comps: &[ComparisonExprArgument],
row_fields: &[Ident],
string_intern: bool,
input_type: &(Vec<DataType>, Vec<DataType>),
) -> Result<Option<TokenStream>, CodegenError> {
if comps.is_empty() {
return Ok(None);
}
let parts: Vec<TokenStream> = comps
.iter()
.map(|c| {
let l =
self.build_row_args_arithmetic_expr(c.left(), row_fields, string_intern, None)?;
let r = self.build_row_args_arithmetic_expr(
c.right(),
row_fields,
string_intern,
None,
)?;
let op = comparison_op_tokens(c.operator());
Ok(
if string_intern
&& c.operator().is_inequality()
&& self.infer_expr_type(c.left(), input_type, None)? == DataType::String
&& self.infer_expr_type(c.right(), input_type, None)? == DataType::String
{
self.features.mark_string_resolve();
quote! { resolve(#l) #op resolve(#r) }
} else {
quote! { #l #op #r }
},
)
})
.collect::<Result<_, CodegenError>>()?;
Ok(Some(quote! { #( #parts )&&* }))
}
}
impl CodeGen {
pub(super) fn build_kv_fn_call_predicate(
&mut self,
fn_calls: &[FnCallPredicateArgument],
string_intern: bool,
) -> Result<Option<TokenStream>, CodegenError> {
if fn_calls.is_empty() {
return Ok(None);
}
let parts: Vec<TokenStream> = fn_calls
.iter()
.map(|fc| {
let fn_name = format_ident!("{}", fc.name());
let param_types = self.udf_param_types(fc.name());
if string_intern && param_types.contains(&DataType::String) {
self.features.mark_string_resolve();
}
let args: Vec<TokenStream> = fc
.args()
.iter()
.enumerate()
.map(|(i, a)| {
let token = self.build_kv_args_arithmetic_expr(a, string_intern, None)?;
Ok(wrap_udf_arg(
token,
param_type_at(¶m_types, i),
string_intern,
))
})
.collect::<Result<_, CodegenError>>()?;
Ok(if fc.is_negated() {
quote! { !udf::#fn_name(#( #args ),*) }
} else {
quote! { udf::#fn_name(#( #args ),*) }
})
})
.collect::<Result<_, CodegenError>>()?;
Ok(Some(quote! { #( #parts )&&* }))
}
pub(super) fn build_join_fn_call_predicate(
&mut self,
fn_calls: &[FnCallPredicateArgument],
string_intern: bool,
) -> Result<Option<TokenStream>, CodegenError> {
if fn_calls.is_empty() {
return Ok(None);
}
let parts: Vec<TokenStream> = fn_calls
.iter()
.map(|fc| {
let fn_name = format_ident!("{}", fc.name());
let param_types = self.udf_param_types(fc.name());
if string_intern && param_types.contains(&DataType::String) {
self.features.mark_string_resolve();
}
let args: Vec<TokenStream> = fc
.args()
.iter()
.enumerate()
.map(|(i, a)| {
let token = self.build_join_args_arithmetic_expr(a, string_intern)?;
Ok(wrap_udf_arg(
token,
param_type_at(¶m_types, i),
string_intern,
))
})
.collect::<Result<_, CodegenError>>()?;
Ok(if fc.is_negated() {
quote! { !udf::#fn_name(#( #args ),*) }
} else {
quote! { udf::#fn_name(#( #args ),*) }
})
})
.collect::<Result<_, CodegenError>>()?;
Ok(Some(quote! { #( #parts )&&* }))
}
pub(super) fn build_row_fn_call_predicate(
&mut self,
fn_calls: &[FnCallPredicateArgument],
row_fields: &[Ident],
string_intern: bool,
) -> Result<Option<TokenStream>, CodegenError> {
if fn_calls.is_empty() {
return Ok(None);
}
let parts: Vec<TokenStream> = fn_calls
.iter()
.map(|fc| {
let fn_name = format_ident!("{}", fc.name());
let param_types = self.udf_param_types(fc.name());
if string_intern && param_types.contains(&DataType::String) {
self.features.mark_string_resolve();
}
let args: Vec<TokenStream> = fc
.args()
.iter()
.enumerate()
.map(|(i, a)| {
let token = self.build_row_args_arithmetic_expr(
a,
row_fields,
string_intern,
None,
)?;
Ok(wrap_udf_arg(
token,
param_type_at(¶m_types, i),
string_intern,
))
})
.collect::<Result<_, CodegenError>>()?;
Ok(if fc.is_negated() {
quote! { !udf::#fn_name(#( #args ),*) }
} else {
quote! { udf::#fn_name(#( #args ),*) }
})
})
.collect::<Result<_, CodegenError>>()?;
Ok(Some(quote! { #( #parts )&&* }))
}
}
pub(super) fn build_kv_constraints_predicate(
constraints: &Constraints,
string_intern: bool,
) -> Result<Option<TokenStream>, CodegenError> {
let mut parts: Vec<TokenStream> = constraints
.constant_eq_constraints()
.as_ref()
.iter()
.map(|(arg, c)| {
let lhs = trans_arg_to_kv_expr(arg)?;
let rhs = const_to_token(c, string_intern)?;
Ok(quote! { #lhs == #rhs })
})
.collect::<Result<_, CodegenError>>()?;
let var_parts = constraints
.variable_eq_constraints()
.as_ref()
.iter()
.map(|(l, r)| {
let lhs = trans_arg_to_kv_expr(l)?;
let rhs = trans_arg_to_kv_expr(r)?;
Ok(quote! { #lhs == #rhs })
})
.collect::<Result<Vec<_>, CodegenError>>()?;
parts.extend(var_parts);
Ok((!parts.is_empty()).then(|| quote! { #( #parts )&&* }))
}
pub(super) fn build_row_constraints_predicate(
constraints: &Constraints,
row_fields: &[Ident],
string_intern: bool,
) -> Result<Option<TokenStream>, CodegenError> {
let mut parts: Vec<TokenStream> = constraints
.constant_eq_constraints()
.as_ref()
.iter()
.map(|(arg, c)| {
let lhs = trans_arg_to_row_expr(arg, row_fields)?;
let rhs = const_to_token(c, string_intern)?;
Ok(quote! { #lhs == #rhs })
})
.collect::<Result<_, CodegenError>>()?;
let var_parts = constraints
.variable_eq_constraints()
.as_ref()
.iter()
.map(|(l, r)| {
let lhs = trans_arg_to_row_expr(l, row_fields)?;
let rhs = trans_arg_to_row_expr(r, row_fields)?;
Ok(quote! { #lhs == #rhs })
})
.collect::<Result<Vec<_>, CodegenError>>()?;
parts.extend(var_parts);
Ok((!parts.is_empty()).then(|| quote! { #( #parts )&&* }))
}
pub fn const_to_token(
constant: &ConstType,
string_intern: bool,
) -> Result<TokenStream, CodegenError> {
Ok(match constant {
ConstType::Int(_) | ConstType::Float(_) => {
return Err(CodegenError::internal(format!(
"polymorphic literal {constant:?} reached codegen; \
typechecker should have pinned it"
)));
}
ConstType::Int8(n) => {
let lit = proc_macro2::Literal::i8_unsuffixed(*n);
quote! { #lit }
}
ConstType::Int16(n) => {
let lit = proc_macro2::Literal::i16_unsuffixed(*n);
quote! { #lit }
}
ConstType::Int32(n) => {
let lit = proc_macro2::Literal::i32_unsuffixed(*n);
quote! { #lit }
}
ConstType::Int64(n) => {
let lit = proc_macro2::Literal::i64_unsuffixed(*n);
quote! { #lit }
}
ConstType::UInt8(n) => {
let lit = proc_macro2::Literal::u8_unsuffixed(*n);
quote! { #lit }
}
ConstType::UInt16(n) => {
let lit = proc_macro2::Literal::u16_unsuffixed(*n);
quote! { #lit }
}
ConstType::UInt32(n) => {
let lit = proc_macro2::Literal::u32_unsuffixed(*n);
quote! { #lit }
}
ConstType::UInt64(n) => {
let lit = proc_macro2::Literal::u64_unsuffixed(*n);
quote! { #lit }
}
ConstType::Float32(v) => {
let lit = proc_macro2::Literal::f32_unsuffixed(v.into_inner());
quote! { OrderedFloat(#lit) }
}
ConstType::Float64(v) => {
let lit = proc_macro2::Literal::f64_unsuffixed(v.into_inner());
quote! { OrderedFloat(#lit) }
}
ConstType::Text(s) => {
if string_intern {
quote! { intern(#s) }
} else {
quote! { #s.to_string() }
}
}
ConstType::Bool(b) => quote! { #b },
})
}
fn trans_arg_to_kv_expr(arg: &TransformationArgument) -> Result<TokenStream, CodegenError> {
match arg {
TransformationArgument::KV((is_key, idx)) => {
let i = Index::from(*idx);
Ok(if *is_key {
quote! { k.#i }
} else {
quote! { v.#i }
})
}
_ => Err(CodegenError::internal(format!(
"non-KV transformation argument ({arg:?}) in KV-constraint builder"
))),
}
}
fn trans_arg_to_row_expr(
arg: &TransformationArgument,
fields: &[Ident],
) -> Result<TokenStream, CodegenError> {
match arg {
TransformationArgument::KV((_, idx)) => {
let ident = fields.get(*idx).ok_or_else(|| {
CodegenError::internal(format!(
"row index {idx} out of bounds (row arity {})",
fields.len()
))
})?;
Ok(quote! { #ident })
}
_ => Err(CodegenError::internal(format!(
"non-KV transformation argument ({arg:?}) in row-constraint builder"
))),
}
}
pub(super) fn combine_predicates(preds: Vec<Option<TokenStream>>) -> Option<TokenStream> {
preds
.into_iter()
.flatten()
.reduce(|a, b| quote! { (#a) && (#b) })
}
fn numeric_arithmetic_op_tokens(op: &ArithmeticOperator) -> TokenStream {
match op {
ArithmeticOperator::Plus => quote! { + },
ArithmeticOperator::Minus => quote! { - },
ArithmeticOperator::Multiply => quote! { * },
ArithmeticOperator::Divide => quote! { / },
ArithmeticOperator::Modulo => quote! { % },
}
}
fn build_cat_batch(factors: Vec<TokenStream>, string_intern: bool) -> TokenStream {
debug_assert!(factors.len() >= 2, "cat requires at least 2 factors");
let fmt_str = "{}".repeat(factors.len());
if string_intern {
quote! { intern(&format!(#fmt_str, #(#factors),*)) }
} else {
quote! { format!(#fmt_str, #(#factors),*) }
}
}
impl CodeGen {
fn build_arithmetic_expr<F>(
&mut self,
expr: &ArithmeticArgument,
string_intern: bool,
resolve_var: &F,
) -> Result<TokenStream, CodegenError>
where
F: Fn(&TransformationArgument) -> Result<TokenStream, CodegenError>,
{
let rest = expr.rest();
let mut result = self.factor_to_token(expr.init(), string_intern, resolve_var)?;
for (i, (op, factor)) in rest.iter().enumerate() {
let op_token = numeric_arithmetic_op_tokens(op);
let factor_token = self.factor_to_token(factor, string_intern, resolve_var)?;
result = if i < rest.len() - 1 {
quote! { ( #result #op_token #factor_token ) }
} else {
quote! { #result #op_token #factor_token }
};
}
Ok(result)
}
fn fncall_to_token<F>(
&mut self,
name: &str,
args: &[ArithmeticArgument],
string_intern: bool,
resolve_var: &F,
) -> Result<TokenStream, CodegenError>
where
F: Fn(&TransformationArgument) -> Result<TokenStream, CodegenError>,
{
let fn_ident = format_ident!("{}", name);
let param_types = self.udf_param_types(name);
let arg_tokens: Vec<TokenStream> = args
.iter()
.enumerate()
.map(|(i, a)| {
let token = self.build_arithmetic_expr(a, string_intern, resolve_var)?;
Ok(wrap_udf_arg(
token,
param_type_at(¶m_types, i),
string_intern,
))
})
.collect::<Result<_, CodegenError>>()?;
let returns_string = self
.udf_return_type(name)
.is_some_and(|t| t == DataType::String);
if string_intern && param_types.contains(&DataType::String) {
self.features.mark_string_resolve();
}
if string_intern && returns_string {
self.features.mark_string_intern();
}
self.features.mark_udf();
let call = quote! { udf::#fn_ident(#(#arg_tokens),*) };
Ok(if string_intern && returns_string {
quote! { intern(&#call) }
} else {
call
})
}
fn factor_to_token<F>(
&mut self,
factor: &FactorArgument,
string_intern: bool,
resolve_var: &F,
) -> Result<TokenStream, CodegenError>
where
F: Fn(&TransformationArgument) -> Result<TokenStream, CodegenError>,
{
match factor {
FactorArgument::Var(arg) => resolve_var(arg),
FactorArgument::Const(c) => const_to_token(c, string_intern),
FactorArgument::FnCall { name, args } => {
self.fncall_to_token(name, args, string_intern, resolve_var)
}
FactorArgument::Builtin { op, args } => {
self.builtin_to_token(*op, args, string_intern, resolve_var)
}
FactorArgument::Group(a) => {
let inner = self.build_arithmetic_expr(a, string_intern, resolve_var)?;
Ok(quote! { ( #inner ) })
}
}
}
fn factor_to_display_token<F>(
&mut self,
factor: &FactorArgument,
string_intern: bool,
resolve_var: &F,
) -> Result<TokenStream, CodegenError>
where
F: Fn(&TransformationArgument) -> Result<TokenStream, CodegenError>,
{
match factor {
FactorArgument::Var(arg) => {
let var_token = resolve_var(arg)?;
Ok(if string_intern {
self.features.mark_string_resolve();
quote! { resolve(#var_token) }
} else {
var_token
})
}
FactorArgument::Const(c) => match c {
ConstType::Text(s) => Ok(quote! { #s }),
_ => const_to_token(c, string_intern),
},
FactorArgument::FnCall { name, args } => {
let call = self.fncall_to_token(name, args, string_intern, resolve_var)?;
Ok(if string_intern {
self.features.mark_string_resolve();
quote! { resolve(#call) }
} else {
call
})
}
FactorArgument::Builtin { op, args } => {
let call = self.builtin_to_token(*op, args, string_intern, resolve_var)?;
Ok(if string_intern && op.ret_type() == DataType::String {
self.features.mark_string_resolve();
quote! { resolve(#call) }
} else {
call
})
}
FactorArgument::Group(a) => {
let inner = self.build_arithmetic_expr(a, string_intern, resolve_var)?;
Ok(quote! { ( #inner ) })
}
}
}
fn builtin_to_token<F>(
&mut self,
op: BuiltinOperator,
args: &[ArithmeticArgument],
string_intern: bool,
resolve_var: &F,
) -> Result<TokenStream, CodegenError>
where
F: Fn(&TransformationArgument) -> Result<TokenStream, CodegenError>,
{
if matches!(op, BuiltinOperator::Cat) {
debug_assert_eq!(args.len(), 2, "cat() arity is enforced at parse time");
let factors: Vec<TokenStream> = args
.iter()
.map(|a| {
debug_assert!(
a.rest.is_empty(),
"cat() arg is a single factor after typecheck"
);
self.factor_to_display_token(&a.init, string_intern, resolve_var)
})
.collect::<Result<_, _>>()?;
return Ok(build_cat_batch(factors, string_intern));
}
let raw: Vec<TokenStream> = args
.iter()
.map(|a| self.build_arithmetic_expr(a, string_intern, resolve_var))
.collect::<Result<_, _>>()?;
if string_intern && op.param_types().contains(&DataType::String) {
self.features.mark_string_resolve();
}
let read_str = |t: &TokenStream| -> TokenStream {
if string_intern {
quote! { resolve(#t) }
} else {
quote! { (#t).as_str() }
}
};
let emit_string = |body: TokenStream| -> TokenStream {
if string_intern {
quote! { intern(&#body) }
} else {
body
}
};
match op {
BuiltinOperator::Strlen => {
let s = read_str(&raw[0]);
Ok(quote! { ((#s).chars().count() as i32) })
}
BuiltinOperator::Substr => {
let s = read_str(&raw[0]);
let start = &raw[1];
let len = &raw[2];
Ok(emit_string(quote! {
(#s).chars().skip((#start) as usize).take((#len) as usize).collect::<String>()
}))
}
BuiltinOperator::Ord => {
debug_assert!(string_intern);
let s = &raw[0];
Ok(quote! { ((#s).into_inner().get() as i32) })
}
BuiltinOperator::Contains => {
let needle = read_str(&raw[0]);
let hay = read_str(&raw[1]);
Ok(quote! { ((#hay).contains(#needle)) })
}
BuiltinOperator::Match => {
let hay = read_str(&raw[1]);
if let FactorArgument::Const(ConstType::Text(p)) = &args[0].init
&& args[0].rest.is_empty()
{
let anchored = format!("^(?:{p})$");
return Ok(quote! {{
static RE: ::std::sync::LazyLock<
Option<::flowlog_runtime::regex::Regex>,
> = ::std::sync::LazyLock::new(|| {
::flowlog_runtime::regex::Regex::new(#anchored).ok()
});
RE.as_ref().is_some_and(|re| re.is_match(#hay))
}});
}
let pat = read_str(&raw[0]);
Ok(quote! {
::flowlog_runtime::regex::Regex::new(&format!("^(?:{})$", #pat))
.map_or(false, |re| re.is_match(#hay))
})
}
BuiltinOperator::ToString => {
let n = &raw[0];
Ok(emit_string(quote! { (#n).to_string() }))
}
BuiltinOperator::ToNumber => {
let s = read_str(&raw[0]);
Ok(quote! { ((#s).parse::<i32>().unwrap_or(0)) })
}
BuiltinOperator::Cat => unreachable!("cat handled early in builtin_to_token"),
}
}
fn build_row_args_arithmetic_expr(
&mut self,
expr: &ArithmeticArgument,
fields: &[Ident],
string_intern: bool,
remaining: Option<&RefCell<HashMap<usize, usize>>>,
) -> Result<TokenStream, CodegenError> {
self.build_arithmetic_expr(expr, string_intern, &|arg| match arg {
TransformationArgument::KV((_, idx)) => {
let ident = fields.get(*idx).ok_or_else(|| {
CodegenError::internal(format!(
"row index {idx} out of bounds (row arity {})",
fields.len()
))
})?;
let needs_clone = remaining.is_some_and(|rem| {
rem.borrow_mut()
.get_mut(idx)
.map(|count| {
*count -= 1;
*count > 0
})
.unwrap_or(false)
});
Ok(if needs_clone {
quote! { #ident.clone() }
} else {
quote! { #ident }
})
}
_ => Err(CodegenError::internal(
"non-KV argument in row arithmetic builder",
)),
})
}
fn build_kv_args_arithmetic_expr(
&mut self,
expr: &ArithmeticArgument,
string_intern: bool,
remaining: Option<&RefCell<HashMap<(bool, usize), usize>>>,
) -> Result<TokenStream, CodegenError> {
self.build_arithmetic_expr(expr, string_intern, &|arg| match arg {
TransformationArgument::KV((is_key, idx))
| TransformationArgument::Jn((_, is_key, idx)) => {
let i = Index::from(*idx);
let needs_clone = remaining.is_some_and(|rem| {
let mut map = rem.borrow_mut();
map.get_mut(&(*is_key, *idx))
.map(|count| {
*count -= 1;
*count > 0
})
.unwrap_or(false)
});
Ok(match (is_key, needs_clone) {
(true, true) => quote! { k.#i.clone() },
(true, false) => quote! { k.#i },
(false, true) => quote! { v.#i.clone() },
(false, false) => quote! { v.#i },
})
}
})
}
fn build_join_args_arithmetic_expr(
&mut self,
expr: &ArithmeticArgument,
string_intern: bool,
) -> Result<TokenStream, CodegenError> {
self.build_arithmetic_expr(expr, string_intern, &|arg| match arg {
TransformationArgument::Jn((is_left, is_key, idx)) => {
let i = Index::from(*idx);
let ident = if *is_key {
Ident::new("k", Span::call_site())
} else if *is_left {
Ident::new("lv", Span::call_site())
} else {
Ident::new("rv", Span::call_site())
};
Ok(quote! { #ident.#i.clone() })
}
_ => Err(CodegenError::internal(
"non-Jn argument in join arithmetic builder",
)),
})
}
}
impl CodeGen {
pub(super) fn udf_param_types(&self, name: &str) -> Vec<DataType> {
self.program
.udfs()
.iter()
.find(|f| f.name() == name)
.map(|f| f.params().iter().map(|p| *p.data_type()).collect())
.unwrap_or_default()
}
pub(super) fn udf_return_type(&self, name: &str) -> Option<DataType> {
self.program
.udfs()
.iter()
.find(|f| f.name() == name)
.map(|f| f.ret_type())
}
}
fn param_type_at(types: &[DataType], idx: usize) -> Option<DataType> {
types.get(idx).copied()
}
fn wrap_udf_arg(
token: TokenStream,
param_type: Option<DataType>,
string_intern: bool,
) -> TokenStream {
if string_intern && param_type == Some(DataType::String) {
quote! { resolve((#token).clone()).to_string() }
} else {
quote! { (#token).clone() }
}
}