1#![cfg_attr(not(debug_assertions), deny(warnings))]
5
6use reifydb_core::value::column::{
7 Column,
8 columns::Columns,
9 data::ColumnData,
10 view::group_by::{GroupByView, GroupKey},
11};
12use reifydb_runtime::clock::Clock;
13use reifydb_transaction::transaction::Transaction;
14use reifydb_type::{
15 fragment::Fragment,
16 util::bitvec::BitVec,
17 value::{identity::IdentityId, r#type::Type},
18};
19
20pub mod blob;
21pub mod clock;
22pub mod date;
23pub mod datetime;
24pub mod duration;
25pub mod error;
26pub mod flow;
27pub mod identity;
28pub mod is;
29pub mod json;
30pub mod math;
31pub mod meta;
32pub mod registry;
33pub mod series;
34pub mod subscription;
35pub mod text;
36pub mod time;
37pub mod wasm;
38
39use error::{AggregateFunctionResult, GeneratorFunctionResult, ScalarFunctionResult};
40use reifydb_catalog::catalog::Catalog;
41
42pub struct GeneratorContext<'a> {
43 pub fragment: Fragment,
44 pub params: Columns,
45 pub txn: &'a mut Transaction<'a>,
46 pub catalog: &'a Catalog,
47 pub identity: IdentityId,
48}
49
50pub trait GeneratorFunction: Send + Sync {
51 fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> GeneratorFunctionResult<Columns>;
52}
53
54pub struct ScalarFunctionContext<'a> {
55 pub fragment: Fragment,
56 pub columns: &'a Columns,
57 pub row_count: usize,
58 pub clock: &'a Clock,
59 pub identity: IdentityId,
60}
61pub trait ScalarFunction: Send + Sync {
62 fn scalar<'a>(&'a self, ctx: ScalarFunctionContext<'a>) -> ScalarFunctionResult<ColumnData>;
63 fn return_type(&self, input_types: &[Type]) -> Type;
64}
65
66pub struct AggregateFunctionContext<'a> {
67 pub fragment: Fragment,
68 pub column: &'a Column,
69 pub groups: &'a GroupByView,
70}
71
72pub trait AggregateFunction: Send + Sync {
73 fn aggregate<'a>(&'a mut self, ctx: AggregateFunctionContext<'a>) -> AggregateFunctionResult<()>;
74 fn finalize(&mut self) -> AggregateFunctionResult<(Vec<GroupKey>, ColumnData)>;
75}
76
77pub fn propagate_options(
89 func: &dyn ScalarFunction,
90 ctx: &ScalarFunctionContext,
91) -> Option<ScalarFunctionResult<ColumnData>> {
92 let has_option = ctx.columns.iter().any(|c| matches!(c.data(), ColumnData::Option { .. }));
93 if !has_option {
94 return None;
95 }
96
97 let mut combined_bv: Option<BitVec> = None;
98 let mut unwrapped = Vec::with_capacity(ctx.columns.len());
99 for col in ctx.columns.iter() {
100 let (inner, bv) = col.data().unwrap_option();
101 if let Some(bv) = bv {
102 combined_bv = Some(match combined_bv {
103 Some(existing) => existing.and(bv),
104 None => bv.clone(),
105 });
106 }
107 unwrapped.push(Column::new(col.name().clone(), inner.clone()));
108 }
109
110 if let Some(ref bv) = combined_bv {
114 if bv.count_ones() == 0 {
115 let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data().get_type()).collect();
116 let result_type = func.return_type(&input_types);
117 return Some(Ok(ColumnData::none_typed(result_type, ctx.row_count)));
118 }
119 }
120
121 let unwrapped_columns = Columns::new(unwrapped);
122 let result = func.scalar(ScalarFunctionContext {
123 fragment: ctx.fragment.clone(),
124 columns: &unwrapped_columns,
125 row_count: ctx.row_count,
126 clock: ctx.clock,
127 identity: ctx.identity,
128 });
129
130 Some(result.map(|data| match combined_bv {
131 Some(bv) => ColumnData::Option {
132 inner: Box::new(data),
133 bitvec: bv,
134 },
135 None => data,
136 }))
137}