1#![cfg_attr(not(debug_assertions), deny(warnings))]
5
6use reifydb_core::value::column::{
7 Column,
8 columns::Columns,
9 data::ColumnData,
10 view::group_by::{GroupByView, GroupKey},
11};
12use reifydb_runtime::clock::Clock;
13use reifydb_transaction::transaction::Transaction;
14use reifydb_type::{
15 fragment::Fragment,
16 util::bitvec::BitVec,
17 value::{identity::IdentityId, r#type::Type},
18};
19
20pub mod blob;
21pub mod clock;
22pub mod date;
23pub mod datetime;
24pub mod duration;
25pub mod error;
26pub mod flow;
27pub mod identity;
28pub mod is;
29pub mod math;
30pub mod meta;
31pub mod registry;
32pub mod series;
33pub mod subscription;
34pub mod text;
35pub mod time;
36pub mod wasm;
37
38use error::{AggregateFunctionResult, GeneratorFunctionResult, ScalarFunctionResult};
39use reifydb_catalog::catalog::Catalog;
40
41pub struct GeneratorContext<'a> {
42 pub fragment: Fragment,
43 pub params: Columns,
44 pub txn: &'a mut Transaction<'a>,
45 pub catalog: &'a Catalog,
46 pub identity: IdentityId,
47}
48
49pub trait GeneratorFunction: Send + Sync {
50 fn generate<'a>(&self, ctx: GeneratorContext<'a>) -> GeneratorFunctionResult<Columns>;
51}
52
53pub struct ScalarFunctionContext<'a> {
54 pub fragment: Fragment,
55 pub columns: &'a Columns,
56 pub row_count: usize,
57 pub clock: &'a Clock,
58 pub identity: IdentityId,
59}
60pub trait ScalarFunction: Send + Sync {
61 fn scalar<'a>(&'a self, ctx: ScalarFunctionContext<'a>) -> ScalarFunctionResult<ColumnData>;
62 fn return_type(&self, input_types: &[Type]) -> Type;
63}
64
65pub struct AggregateFunctionContext<'a> {
66 pub fragment: Fragment,
67 pub column: &'a Column,
68 pub groups: &'a GroupByView,
69}
70
71pub trait AggregateFunction: Send + Sync {
72 fn aggregate<'a>(&'a mut self, ctx: AggregateFunctionContext<'a>) -> AggregateFunctionResult<()>;
73 fn finalize(&mut self) -> AggregateFunctionResult<(Vec<GroupKey>, ColumnData)>;
74}
75
76pub fn propagate_options(
88 func: &dyn ScalarFunction,
89 ctx: &ScalarFunctionContext,
90) -> Option<ScalarFunctionResult<ColumnData>> {
91 let has_option = ctx.columns.iter().any(|c| matches!(c.data(), ColumnData::Option { .. }));
92 if !has_option {
93 return None;
94 }
95
96 let mut combined_bv: Option<BitVec> = None;
97 let mut unwrapped = Vec::with_capacity(ctx.columns.len());
98 for col in ctx.columns.iter() {
99 let (inner, bv) = col.data().unwrap_option();
100 if let Some(bv) = bv {
101 combined_bv = Some(match combined_bv {
102 Some(existing) => existing.and(bv),
103 None => bv.clone(),
104 });
105 }
106 unwrapped.push(Column::new(col.name().clone(), inner.clone()));
107 }
108
109 if let Some(ref bv) = combined_bv {
113 if bv.count_ones() == 0 {
114 let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data().get_type()).collect();
115 let result_type = func.return_type(&input_types);
116 return Some(Ok(ColumnData::none_typed(result_type, ctx.row_count)));
117 }
118 }
119
120 let unwrapped_columns = Columns::new(unwrapped);
121 let result = func.scalar(ScalarFunctionContext {
122 fragment: ctx.fragment.clone(),
123 columns: &unwrapped_columns,
124 row_count: ctx.row_count,
125 clock: ctx.clock,
126 identity: ctx.identity,
127 });
128
129 Some(result.map(|data| match combined_bv {
130 Some(bv) => ColumnData::Option {
131 inner: Box::new(data),
132 bitvec: bv,
133 },
134 None => data,
135 }))
136}