Skip to main content

reifydb_engine/expression/
call.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{Column, columns::Columns, data::ColumnData, view::group_by::GroupByView};
5use reifydb_routine::function::{FunctionCapability, FunctionContext, error::FunctionError, registry::Functions};
6use reifydb_rql::expression::CallExpression;
7use reifydb_type::{
8	error::Error,
9	fragment::Fragment,
10	value::{Value, r#type::Type},
11};
12
13use crate::{Result, error::EngineError, expression::context::EvalContext};
14
15pub(crate) fn call_builtin(
16	ctx: &EvalContext,
17	call: &CallExpression,
18	arguments: Columns,
19	functions: &Functions,
20) -> Result<Column> {
21	let function_name = call.func.0.text();
22	let fn_fragment = call.func.0.clone();
23
24	// UDFs are hoisted to UdfEvalNode during volcano initialization.
25	// If one reaches here, it's a bug in the query plan.
26	assert!(
27		ctx.symbols.get_function(function_name).is_none(),
28		"UDF '{}' should have been hoisted to UdfEvalNode",
29		function_name
30	);
31
32	let function = functions.get(function_name).ok_or_else(|| -> Error {
33		EngineError::UnknownFunction {
34			name: function_name.to_string(),
35			fragment: fn_fragment.clone(),
36		}
37		.into()
38	})?;
39
40	let fn_ctx = FunctionContext::new(fn_fragment.clone(), ctx.runtime_context, ctx.identity, ctx.row_count);
41
42	// Check if we're in aggregation context and if function exists as aggregate
43	if ctx.is_aggregate_context && function.capabilities().contains(&FunctionCapability::Aggregate) {
44		let mut accumulator = function.accumulator(&fn_ctx).ok_or_else(|| FunctionError::ExecutionFailed {
45			function: fn_fragment.clone(),
46			reason: format!("Function {} is not an aggregate", function_name),
47		})?;
48
49		let column = if call.args.is_empty() {
50			Column {
51				name: Fragment::internal("dummy"),
52				data: ColumnData::with_capacity(Type::Int4, ctx.row_count),
53			}
54		} else {
55			arguments[0].clone()
56		};
57
58		let mut group_view = GroupByView::new();
59		let all_indices: Vec<usize> = (0..ctx.row_count).collect();
60		group_view.insert(Vec::<Value>::new(), all_indices);
61
62		accumulator
63			.update(&Columns::new(vec![column]), &group_view)
64			.map_err(|e| e.with_context(fn_fragment.clone()))?;
65
66		let (_keys, result_data) = accumulator.finalize().map_err(|e| e.with_context(fn_fragment))?;
67
68		return Ok(Column {
69			name: call.full_fragment_owned(),
70			data: result_data,
71		});
72	}
73
74	let result_columns = function.call(&fn_ctx, &arguments).map_err(|e| e.with_context(fn_fragment))?;
75
76	// For scalar, we expect 1 column. For generator in scalar context, we take the first column.
77	let result_column = result_columns.into_iter().next().ok_or_else(|| FunctionError::ExecutionFailed {
78		function: call.func.0.clone(),
79		reason: "Function returned no columns".to_string(),
80	})?;
81
82	Ok(Column {
83		name: call.full_fragment_owned(),
84		data: result_column.data,
85	})
86}