Skip to main content

reifydb_function/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4#![cfg_attr(not(debug_assertions), deny(warnings))]
5
6use reifydb_core::value::column::{
7	Column,
8	columns::Columns,
9	data::ColumnData,
10	view::group_by::{GroupByView, GroupKey},
11};
12use reifydb_runtime::clock::Clock;
13use reifydb_transaction::transaction::Transaction;
14use reifydb_type::{
15	fragment::Fragment,
16	util::bitvec::BitVec,
17	value::{identity::IdentityId, r#type::Type},
18};
19
20pub mod blob;
21pub mod clock;
22pub mod date;
23pub mod datetime;
24pub mod duration;
25pub mod error;
26pub mod flow;
27pub mod identity;
28pub mod is;
29pub mod json;
30pub mod math;
31pub mod meta;
32pub mod registry;
33pub mod series;
34pub mod subscription;
35pub mod text;
36pub mod time;
37pub mod wasm;
38
39use error::{AggregateFunctionResult, GeneratorFunctionResult, ScalarFunctionResult};
40use reifydb_catalog::catalog::Catalog;
41
42pub struct GeneratorContext<'a> {
43	pub fragment: Fragment,
44	pub params: Columns,
45	pub txn: &'a mut Transaction<'a>,
46	pub catalog: &'a Catalog,
47	pub identity: IdentityId,
48}
49
50pub trait GeneratorFunction: Send + Sync {
51	fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> GeneratorFunctionResult<Columns>;
52}
53
54pub struct ScalarFunctionContext<'a> {
55	pub fragment: Fragment,
56	pub columns: &'a Columns,
57	pub row_count: usize,
58	pub clock: &'a Clock,
59	pub identity: IdentityId,
60}
61pub trait ScalarFunction: Send + Sync {
62	fn scalar<'a>(&'a self, ctx: ScalarFunctionContext<'a>) -> ScalarFunctionResult<ColumnData>;
63	fn return_type(&self, input_types: &[Type]) -> Type;
64}
65
66pub struct AggregateFunctionContext<'a> {
67	pub fragment: Fragment,
68	pub column: &'a Column,
69	pub groups: &'a GroupByView,
70}
71
72pub trait AggregateFunction: Send + Sync {
73	fn aggregate<'a>(&'a mut self, ctx: AggregateFunctionContext<'a>) -> AggregateFunctionResult<()>;
74	fn finalize(&mut self) -> AggregateFunctionResult<(Vec<GroupKey>, ColumnData)>;
75}
76
77/// Helper for scalar functions to opt into Option propagation.
78///
79/// If any argument column is `ColumnData::Option`,
80/// this unwraps the Option wrappers, calls `func.scalar()` recursively on the
81/// inner data, and re-wraps the result with the combined bitvec.
82///
83/// Returns `None` when no Option columns are present (the caller should
84/// proceed with its normal typed logic).
85///
86/// Functions that need raw access to Options (e.g. `is::some`, `is::none`)
87/// simply don't call this helper.
88pub fn propagate_options(
89	func: &dyn ScalarFunction,
90	ctx: &ScalarFunctionContext,
91) -> Option<ScalarFunctionResult<ColumnData>> {
92	let has_option = ctx.columns.iter().any(|c| matches!(c.data(), ColumnData::Option { .. }));
93	if !has_option {
94		return None;
95	}
96
97	let mut combined_bv: Option<BitVec> = None;
98	let mut unwrapped = Vec::with_capacity(ctx.columns.len());
99	for col in ctx.columns.iter() {
100		let (inner, bv) = col.data().unwrap_option();
101		if let Some(bv) = bv {
102			combined_bv = Some(match combined_bv {
103				Some(existing) => existing.and(bv),
104				None => bv.clone(),
105			});
106		}
107		unwrapped.push(Column::new(col.name().clone(), inner.clone()));
108	}
109
110	// Short-circuit: when all combined values are None, skip the inner function
111	// call entirely to avoid type-validation errors on placeholder inner types
112	// (e.g. none typed as Option<Any> would fail numeric type checks).
113	if let Some(ref bv) = combined_bv {
114		if bv.count_ones() == 0 {
115			let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data().get_type()).collect();
116			let result_type = func.return_type(&input_types);
117			return Some(Ok(ColumnData::none_typed(result_type, ctx.row_count)));
118		}
119	}
120
121	let unwrapped_columns = Columns::new(unwrapped);
122	let result = func.scalar(ScalarFunctionContext {
123		fragment: ctx.fragment.clone(),
124		columns: &unwrapped_columns,
125		row_count: ctx.row_count,
126		clock: ctx.clock,
127		identity: ctx.identity,
128	});
129
130	Some(result.map(|data| match combined_bv {
131		Some(bv) => ColumnData::Option {
132			inner: Box::new(data),
133			bitvec: bv,
134		},
135		None => data,
136	}))
137}