Skip to main content

reifydb_function/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{
5	Column,
6	columns::Columns,
7	data::ColumnData,
8	view::group_by::{GroupByView, GroupKey},
9};
10use reifydb_runtime::clock::Clock;
11use reifydb_transaction::transaction::Transaction;
12use reifydb_type::{fragment::Fragment, util::bitvec::BitVec, value::r#type::Type};
13
14pub mod blob;
15pub mod clock;
16pub mod date;
17pub mod datetime;
18pub mod duration;
19pub mod error;
20pub mod flow;
21pub mod is;
22pub mod math;
23pub mod meta;
24pub mod registry;
25pub mod series;
26pub mod subscription;
27pub mod text;
28pub mod time;
29
30use error::{AggregateFunctionResult, GeneratorFunctionResult, ScalarFunctionResult};
31use reifydb_catalog::catalog::Catalog;
32
33pub struct GeneratorContext<'a> {
34	pub fragment: Fragment,
35	pub params: Columns,
36	pub txn: &'a mut Transaction<'a>,
37	pub catalog: &'a Catalog,
38}
39
40pub trait GeneratorFunction: Send + Sync {
41	fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> GeneratorFunctionResult<Columns>;
42}
43
44pub struct ScalarFunctionContext<'a> {
45	pub fragment: Fragment,
46	pub columns: &'a Columns,
47	pub row_count: usize,
48	pub clock: &'a Clock,
49}
50pub trait ScalarFunction: Send + Sync {
51	fn scalar<'a>(&'a self, ctx: ScalarFunctionContext<'a>) -> ScalarFunctionResult<ColumnData>;
52	fn return_type(&self, input_types: &[Type]) -> Type;
53}
54
55pub struct AggregateFunctionContext<'a> {
56	pub fragment: Fragment,
57	pub column: &'a Column,
58	pub groups: &'a GroupByView,
59}
60
61pub trait AggregateFunction: Send + Sync {
62	fn aggregate<'a>(&'a mut self, ctx: AggregateFunctionContext<'a>) -> AggregateFunctionResult<()>;
63	fn finalize(&mut self) -> AggregateFunctionResult<(Vec<GroupKey>, ColumnData)>;
64}
65
66/// Helper for scalar functions to opt into Option propagation.
67///
68/// If any argument column is `ColumnData::Option`,
69/// this unwraps the Option wrappers, calls `func.scalar()` recursively on the
70/// inner data, and re-wraps the result with the combined bitvec.
71///
72/// Returns `None` when no Option columns are present (the caller should
73/// proceed with its normal typed logic).
74///
75/// Functions that need raw access to Options (e.g. `is::some`, `is::none`)
76/// simply don't call this helper.
77pub fn propagate_options(
78	func: &dyn ScalarFunction,
79	ctx: &ScalarFunctionContext,
80) -> Option<ScalarFunctionResult<ColumnData>> {
81	let has_option = ctx.columns.iter().any(|c| matches!(c.data(), ColumnData::Option { .. }));
82	if !has_option {
83		return None;
84	}
85
86	let mut combined_bv: Option<BitVec> = None;
87	let mut unwrapped = Vec::with_capacity(ctx.columns.len());
88	for col in ctx.columns.iter() {
89		let (inner, bv) = col.data().unwrap_option();
90		if let Some(bv) = bv {
91			combined_bv = Some(match combined_bv {
92				Some(existing) => existing.and(bv),
93				None => bv.clone(),
94			});
95		}
96		unwrapped.push(Column::new(col.name().clone(), inner.clone()));
97	}
98
99	// Short-circuit: when all combined values are None, skip the inner function
100	// call entirely to avoid type-validation errors on placeholder inner types
101	// (e.g. none typed as Option<Any> would fail numeric type checks).
102	if let Some(ref bv) = combined_bv {
103		if bv.count_ones() == 0 {
104			let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data().get_type()).collect();
105			let result_type = func.return_type(&input_types);
106			return Some(Ok(ColumnData::none_typed(result_type, ctx.row_count)));
107		}
108	}
109
110	let unwrapped_columns = Columns::new(unwrapped);
111	let result = func.scalar(ScalarFunctionContext {
112		fragment: ctx.fragment.clone(),
113		columns: &unwrapped_columns,
114		row_count: ctx.row_count,
115		clock: ctx.clock,
116	});
117
118	Some(result.map(|data| match combined_bv {
119		Some(bv) => ColumnData::Option {
120			inner: Box::new(data),
121			bitvec: bv,
122		},
123		None => data,
124	}))
125}