1use reifydb_core::value::column::{
5 Column,
6 columns::Columns,
7 data::ColumnData,
8 view::group_by::{GroupByView, GroupKey},
9};
10use reifydb_runtime::clock::Clock;
11use reifydb_transaction::transaction::Transaction;
12use reifydb_type::{fragment::Fragment, util::bitvec::BitVec, value::r#type::Type};
13
14pub mod blob;
15pub mod clock;
16pub mod date;
17pub mod datetime;
18pub mod duration;
19pub mod error;
20pub mod flow;
21pub mod is;
22pub mod math;
23pub mod meta;
24pub mod registry;
25pub mod series;
26pub mod subscription;
27pub mod text;
28pub mod time;
29
30use error::{AggregateFunctionResult, GeneratorFunctionResult, ScalarFunctionResult};
31use reifydb_catalog::catalog::Catalog;
32
33pub struct GeneratorContext<'a> {
34 pub fragment: Fragment,
35 pub params: Columns,
36 pub txn: &'a mut Transaction<'a>,
37 pub catalog: &'a Catalog,
38}
39
40pub trait GeneratorFunction: Send + Sync {
41 fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> GeneratorFunctionResult<Columns>;
42}
43
44pub struct ScalarFunctionContext<'a> {
45 pub fragment: Fragment,
46 pub columns: &'a Columns,
47 pub row_count: usize,
48 pub clock: &'a Clock,
49}
50pub trait ScalarFunction: Send + Sync {
51 fn scalar<'a>(&'a self, ctx: ScalarFunctionContext<'a>) -> ScalarFunctionResult<ColumnData>;
52 fn return_type(&self, input_types: &[Type]) -> Type;
53}
54
55pub struct AggregateFunctionContext<'a> {
56 pub fragment: Fragment,
57 pub column: &'a Column,
58 pub groups: &'a GroupByView,
59}
60
61pub trait AggregateFunction: Send + Sync {
62 fn aggregate<'a>(&'a mut self, ctx: AggregateFunctionContext<'a>) -> AggregateFunctionResult<()>;
63 fn finalize(&mut self) -> AggregateFunctionResult<(Vec<GroupKey>, ColumnData)>;
64}
65
66pub fn propagate_options(
78 func: &dyn ScalarFunction,
79 ctx: &ScalarFunctionContext,
80) -> Option<ScalarFunctionResult<ColumnData>> {
81 let has_option = ctx.columns.iter().any(|c| matches!(c.data(), ColumnData::Option { .. }));
82 if !has_option {
83 return None;
84 }
85
86 let mut combined_bv: Option<BitVec> = None;
87 let mut unwrapped = Vec::with_capacity(ctx.columns.len());
88 for col in ctx.columns.iter() {
89 let (inner, bv) = col.data().unwrap_option();
90 if let Some(bv) = bv {
91 combined_bv = Some(match combined_bv {
92 Some(existing) => existing.and(bv),
93 None => bv.clone(),
94 });
95 }
96 unwrapped.push(Column::new(col.name().clone(), inner.clone()));
97 }
98
99 if let Some(ref bv) = combined_bv {
103 if bv.count_ones() == 0 {
104 let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data().get_type()).collect();
105 let result_type = func.return_type(&input_types);
106 return Some(Ok(ColumnData::none_typed(result_type, ctx.row_count)));
107 }
108 }
109
110 let unwrapped_columns = Columns::new(unwrapped);
111 let result = func.scalar(ScalarFunctionContext {
112 fragment: ctx.fragment.clone(),
113 columns: &unwrapped_columns,
114 row_count: ctx.row_count,
115 clock: ctx.clock,
116 });
117
118 Some(result.map(|data| match combined_bv {
119 Some(bv) => ColumnData::Option {
120 inner: Box::new(data),
121 bitvec: bv,
122 },
123 None => data,
124 }))
125}