reifydb_engine/expression/
call.rs1use reifydb_core::value::column::{
5 ColumnWithName, buffer::ColumnBuffer, columns::Columns, view::group_by::GroupByView,
6};
7use reifydb_routine::routine::{FunctionKind, context::FunctionContext, error::RoutineError};
8use reifydb_rql::expression::{CallExpression, Expression, name::display_label};
9use reifydb_type::{
10 error::Error,
11 fragment::Fragment,
12 value::{Value, r#type::Type},
13};
14
15use crate::{Result, error::EngineError, expression::context::EvalContext};
16
17pub(crate) fn call_builtin(ctx: &EvalContext, call: &CallExpression, arguments: Columns) -> Result<ColumnWithName> {
18 let function_name = call.func.0.text();
19 let fn_fragment = call.func.0.clone();
20 let result_label = display_label(&Expression::Call(call.clone()));
21
22 assert!(
23 ctx.symbols.get_function(function_name).is_none(),
24 "UDF '{}' should have been hoisted to UdfEvalNode",
25 function_name
26 );
27
28 let routine = ctx.routines.get_function(function_name).ok_or_else(|| -> Error {
29 EngineError::UnknownFunction {
30 name: function_name.to_string(),
31 fragment: fn_fragment.clone(),
32 }
33 .into()
34 })?;
35
36 let mut fn_ctx = FunctionContext {
37 fragment: fn_fragment.clone(),
38 identity: ctx.identity,
39 row_count: ctx.row_count,
40 runtime_context: ctx.runtime_context,
41 };
42
43 if ctx.is_aggregate_context && routine.kinds().contains(&FunctionKind::Aggregate) {
44 let mut accumulator =
45 routine.accumulator(&mut fn_ctx).ok_or_else(|| RoutineError::FunctionExecutionFailed {
46 function: fn_fragment.clone(),
47 reason: format!("Function {} is not an aggregate", function_name),
48 })?;
49
50 let column = if call.args.is_empty() {
51 ColumnWithName {
52 name: Fragment::internal("dummy"),
53 data: ColumnBuffer::with_capacity(Type::Int4, ctx.row_count),
54 }
55 } else {
56 ColumnWithName::new(arguments.name_at(0).clone(), arguments[0].clone())
57 };
58
59 let mut group_view = GroupByView::new();
60 let all_indices: Vec<usize> = (0..ctx.row_count).collect();
61 group_view.insert(Vec::<Value>::new(), all_indices);
62
63 accumulator
64 .update(&Columns::new(vec![column]), &group_view)
65 .map_err(|e| e.with_context(fn_fragment.clone(), false))?;
66
67 let (_keys, result_data) = accumulator.finalize().map_err(|e| e.with_context(fn_fragment, false))?;
68
69 return Ok(ColumnWithName::new(result_label.clone(), result_data));
70 }
71
72 let result_columns = routine.call(&mut fn_ctx, &arguments).map_err(|e| e.with_context(fn_fragment, false))?;
73
74 if result_columns.is_empty() {
75 return Err(RoutineError::FunctionExecutionFailed {
76 function: call.func.0.clone(),
77 reason: "Function returned no columns".to_string(),
78 }
79 .into());
80 }
81 let result_data = result_columns.data_at(0).clone();
82 Ok(ColumnWithName::new(result_label, result_data))
83}