Skip to main content

reifydb_function/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4#![cfg_attr(not(debug_assertions), deny(warnings))]
5
6use reifydb_core::value::column::{
7	Column,
8	columns::Columns,
9	data::ColumnData,
10	view::group_by::{GroupByView, GroupKey},
11};
12use reifydb_runtime::clock::Clock;
13use reifydb_transaction::transaction::Transaction;
14use reifydb_type::{
15	fragment::Fragment,
16	util::bitvec::BitVec,
17	value::{identity::IdentityId, r#type::Type},
18};
19
20pub mod blob;
21pub mod clock;
22pub mod date;
23pub mod datetime;
24pub mod duration;
25pub mod error;
26pub mod flow;
27pub mod identity;
28pub mod is;
29pub mod math;
30pub mod meta;
31pub mod registry;
32pub mod series;
33pub mod subscription;
34pub mod text;
35pub mod time;
36pub mod wasm;
37
38use error::{AggregateFunctionResult, GeneratorFunctionResult, ScalarFunctionResult};
39use reifydb_catalog::catalog::Catalog;
40
41pub struct GeneratorContext<'a> {
42	pub fragment: Fragment,
43	pub params: Columns,
44	pub txn: &'a mut Transaction<'a>,
45	pub catalog: &'a Catalog,
46	pub identity: IdentityId,
47}
48
49pub trait GeneratorFunction: Send + Sync {
50	fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> GeneratorFunctionResult<Columns>;
51}
52
53pub struct ScalarFunctionContext<'a> {
54	pub fragment: Fragment,
55	pub columns: &'a Columns,
56	pub row_count: usize,
57	pub clock: &'a Clock,
58	pub identity: IdentityId,
59}
60pub trait ScalarFunction: Send + Sync {
61	fn scalar<'a>(&'a self, ctx: ScalarFunctionContext<'a>) -> ScalarFunctionResult<ColumnData>;
62	fn return_type(&self, input_types: &[Type]) -> Type;
63}
64
65pub struct AggregateFunctionContext<'a> {
66	pub fragment: Fragment,
67	pub column: &'a Column,
68	pub groups: &'a GroupByView,
69}
70
71pub trait AggregateFunction: Send + Sync {
72	fn aggregate<'a>(&'a mut self, ctx: AggregateFunctionContext<'a>) -> AggregateFunctionResult<()>;
73	fn finalize(&mut self) -> AggregateFunctionResult<(Vec<GroupKey>, ColumnData)>;
74}
75
76/// Helper for scalar functions to opt into Option propagation.
77///
78/// If any argument column is `ColumnData::Option`,
79/// this unwraps the Option wrappers, calls `func.scalar()` recursively on the
80/// inner data, and re-wraps the result with the combined bitvec.
81///
82/// Returns `None` when no Option columns are present (the caller should
83/// proceed with its normal typed logic).
84///
85/// Functions that need raw access to Options (e.g. `is::some`, `is::none`)
86/// simply don't call this helper.
87pub fn propagate_options(
88	func: &dyn ScalarFunction,
89	ctx: &ScalarFunctionContext,
90) -> Option<ScalarFunctionResult<ColumnData>> {
91	let has_option = ctx.columns.iter().any(|c| matches!(c.data(), ColumnData::Option { .. }));
92	if !has_option {
93		return None;
94	}
95
96	let mut combined_bv: Option<BitVec> = None;
97	let mut unwrapped = Vec::with_capacity(ctx.columns.len());
98	for col in ctx.columns.iter() {
99		let (inner, bv) = col.data().unwrap_option();
100		if let Some(bv) = bv {
101			combined_bv = Some(match combined_bv {
102				Some(existing) => existing.and(bv),
103				None => bv.clone(),
104			});
105		}
106		unwrapped.push(Column::new(col.name().clone(), inner.clone()));
107	}
108
109	// Short-circuit: when all combined values are None, skip the inner function
110	// call entirely to avoid type-validation errors on placeholder inner types
111	// (e.g. none typed as Option<Any> would fail numeric type checks).
112	if let Some(ref bv) = combined_bv {
113		if bv.count_ones() == 0 {
114			let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data().get_type()).collect();
115			let result_type = func.return_type(&input_types);
116			return Some(Ok(ColumnData::none_typed(result_type, ctx.row_count)));
117		}
118	}
119
120	let unwrapped_columns = Columns::new(unwrapped);
121	let result = func.scalar(ScalarFunctionContext {
122		fragment: ctx.fragment.clone(),
123		columns: &unwrapped_columns,
124		row_count: ctx.row_count,
125		clock: ctx.clock,
126		identity: ctx.identity,
127	});
128
129	Some(result.map(|data| match combined_bv {
130		Some(bv) => ColumnData::Option {
131			inner: Box::new(data),
132			bitvec: bv,
133		},
134		None => data,
135	}))
136}