reifydb_engine/expression/
call.rs1use reifydb_core::value::column::{Column, columns::Columns, data::ColumnData, view::group_by::GroupByView};
5use reifydb_routine::function::{FunctionCapability, FunctionContext, error::FunctionError, registry::Functions};
6use reifydb_rql::expression::CallExpression;
7use reifydb_type::{
8 error::Error,
9 fragment::Fragment,
10 value::{Value, r#type::Type},
11};
12
13use crate::{Result, error::EngineError, expression::context::EvalContext};
14
15pub(crate) fn call_builtin(
16 ctx: &EvalContext,
17 call: &CallExpression,
18 arguments: Columns,
19 functions: &Functions,
20) -> Result<Column> {
21 let function_name = call.func.0.text();
22 let fn_fragment = call.func.0.clone();
23
24 assert!(
27 ctx.symbols.get_function(function_name).is_none(),
28 "UDF '{}' should have been hoisted to UdfEvalNode",
29 function_name
30 );
31
32 let function = functions.get(function_name).ok_or_else(|| -> Error {
33 EngineError::UnknownFunction {
34 name: function_name.to_string(),
35 fragment: fn_fragment.clone(),
36 }
37 .into()
38 })?;
39
40 let fn_ctx = FunctionContext::new(fn_fragment.clone(), ctx.runtime_context, ctx.identity, ctx.row_count);
41
42 if ctx.is_aggregate_context && function.capabilities().contains(&FunctionCapability::Aggregate) {
44 let mut accumulator = function.accumulator(&fn_ctx).ok_or_else(|| FunctionError::ExecutionFailed {
45 function: fn_fragment.clone(),
46 reason: format!("Function {} is not an aggregate", function_name),
47 })?;
48
49 let column = if call.args.is_empty() {
50 Column {
51 name: Fragment::internal("dummy"),
52 data: ColumnData::with_capacity(Type::Int4, ctx.row_count),
53 }
54 } else {
55 arguments[0].clone()
56 };
57
58 let mut group_view = GroupByView::new();
59 let all_indices: Vec<usize> = (0..ctx.row_count).collect();
60 group_view.insert(Vec::<Value>::new(), all_indices);
61
62 accumulator
63 .update(&Columns::new(vec![column]), &group_view)
64 .map_err(|e| e.with_context(fn_fragment.clone()))?;
65
66 let (_keys, result_data) = accumulator.finalize().map_err(|e| e.with_context(fn_fragment))?;
67
68 return Ok(Column {
69 name: call.full_fragment_owned(),
70 data: result_data,
71 });
72 }
73
74 let result_columns = function.call(&fn_ctx, &arguments).map_err(|e| e.with_context(fn_fragment))?;
75
76 let result_column = result_columns.into_iter().next().ok_or_else(|| FunctionError::ExecutionFailed {
78 function: call.func.0.clone(),
79 reason: "Function returned no columns".to_string(),
80 })?;
81
82 Ok(Column {
83 name: call.full_fragment_owned(),
84 data: result_column.data,
85 })
86}