Skip to main content

reifydb_engine/expression/
call.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{
5	ColumnWithName, buffer::ColumnBuffer, columns::Columns, view::group_by::GroupByView,
6};
7use reifydb_routine::routine::{FunctionKind, context::FunctionContext, error::RoutineError};
8use reifydb_rql::expression::{CallExpression, Expression, name::display_label};
9use reifydb_type::{
10	error::Error,
11	fragment::Fragment,
12	value::{Value, r#type::Type},
13};
14
15use crate::{Result, error::EngineError, expression::context::EvalContext};
16
17pub(crate) fn call_builtin(ctx: &EvalContext, call: &CallExpression, arguments: Columns) -> Result<ColumnWithName> {
18	let function_name = call.func.0.text();
19	let fn_fragment = call.func.0.clone();
20	let result_label = display_label(&Expression::Call(call.clone()));
21
22	// UDFs are hoisted to UdfEvalNode during volcano initialization.
23	// If one reaches here, it's a bug in the query plan.
24	assert!(
25		ctx.symbols.get_function(function_name).is_none(),
26		"UDF '{}' should have been hoisted to UdfEvalNode",
27		function_name
28	);
29
30	let routine = ctx.routines.get_function(function_name).ok_or_else(|| -> Error {
31		EngineError::UnknownFunction {
32			name: function_name.to_string(),
33			fragment: fn_fragment.clone(),
34		}
35		.into()
36	})?;
37
38	let mut fn_ctx = FunctionContext {
39		fragment: fn_fragment.clone(),
40		identity: ctx.identity,
41		row_count: ctx.row_count,
42		runtime_context: ctx.runtime_context,
43	};
44
45	// Aggregate scalar-context fast path (e.g. `sum(x)` inside a SELECT projection
46	// during GROUP-BY-less aggregation). Mirrors today's behaviour: build a single
47	// group, run the accumulator, return the finalised value.
48	if ctx.is_aggregate_context && routine.kinds().contains(&FunctionKind::Aggregate) {
49		let mut accumulator =
50			routine.accumulator(&mut fn_ctx).ok_or_else(|| RoutineError::FunctionExecutionFailed {
51				function: fn_fragment.clone(),
52				reason: format!("Function {} is not an aggregate", function_name),
53			})?;
54
55		let column = if call.args.is_empty() {
56			ColumnWithName {
57				name: Fragment::internal("dummy"),
58				data: ColumnBuffer::with_capacity(Type::Int4, ctx.row_count),
59			}
60		} else {
61			ColumnWithName::new(arguments.name_at(0).clone(), arguments[0].clone())
62		};
63
64		let mut group_view = GroupByView::new();
65		let all_indices: Vec<usize> = (0..ctx.row_count).collect();
66		group_view.insert(Vec::<Value>::new(), all_indices);
67
68		accumulator
69			.update(&Columns::new(vec![column]), &group_view)
70			.map_err(|e| e.with_context(fn_fragment.clone(), false))?;
71
72		let (_keys, result_data) = accumulator.finalize().map_err(|e| e.with_context(fn_fragment, false))?;
73
74		return Ok(ColumnWithName::new(result_label.clone(), result_data));
75	}
76
77	let result_columns = routine.call(&mut fn_ctx, &arguments).map_err(|e| e.with_context(fn_fragment, false))?;
78
79	// For scalar, we expect 1 column. For generator in scalar context, we take the first column.
80	if result_columns.is_empty() {
81		return Err(RoutineError::FunctionExecutionFailed {
82			function: call.func.0.clone(),
83			reason: "Function returned no columns".to_string(),
84		}
85		.into());
86	}
87	let result_data = result_columns.data_at(0).clone();
88	Ok(ColumnWithName::new(result_label, result_data))
89}