reifydb_routine/routine/
mod.rs1pub mod context;
5pub mod error;
6pub mod registry;
7
8use error::RoutineError;
9use reifydb_core::value::column::{
10 ColumnWithName,
11 buffer::ColumnBuffer,
12 columns::Columns,
13 view::group_by::{GroupByView, GroupKey},
14};
15use reifydb_type::{
16 fragment::Fragment,
17 util::bitvec::BitVec,
18 value::r#type::{Type, input_types::InputTypes},
19};
20use serde::{Deserialize, Serialize};
21
22mod sealed {
23 pub trait Sealed {}
24}
25
26pub trait Context: Send + Sync + sealed::Sealed {}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum FunctionKind {
35 Scalar,
37 Aggregate,
39 Generator,
41}
42
43#[derive(Debug, Clone)]
44pub struct RoutineInfo {
45 pub name: String,
46 pub description: Option<String>,
47}
48
49impl RoutineInfo {
50 pub fn new(name: &str) -> Self {
51 Self {
52 name: name.to_string(),
53 description: None,
54 }
55 }
56}
57
58pub trait Routine<C: Context>: Send + Sync {
67 fn info(&self) -> &RoutineInfo;
68
69 fn return_type(&self, input_types: &[Type]) -> Type;
70
71 fn accepted_types(&self) -> InputTypes {
72 InputTypes::any()
73 }
74
75 fn propagates_options(&self) -> bool {
76 true
77 }
78
79 fn execute(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError>;
86
87 fn call(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError> {
91 if !self.propagates_options() {
92 return self.execute(ctx, args);
93 }
94
95 let has_option = args.iter().any(|c| matches!(c.data(), ColumnBuffer::Option { .. }));
96 if !has_option {
97 return self.execute(ctx, args);
98 }
99
100 let mut combined_bv: Option<BitVec> = None;
101 let mut unwrapped = Vec::with_capacity(args.len());
102 for col in args.iter() {
103 let (inner, bv) = col.data().unwrap_option();
104 if let Some(bv) = bv {
105 combined_bv = Some(match combined_bv {
106 Some(existing) => existing.and(bv),
107 None => bv.clone(),
108 });
109 }
110 unwrapped.push(ColumnWithName::new(col.name().clone(), inner.clone()));
111 }
112
113 if let Some(ref bv) = combined_bv
116 && bv.count_ones() == 0
117 {
118 let row_count = args.row_count();
119 let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data.get_type()).collect();
120 let result_type = self.return_type(&input_types);
121 let result_data = ColumnBuffer::none_typed(result_type, row_count);
122 return Ok(Columns::new(vec![ColumnWithName::new(
123 Fragment::internal(self.info().name.clone()),
124 result_data,
125 )]));
126 }
127
128 let unwrapped_args = Columns::new(unwrapped);
129 let result = self.execute(ctx, &unwrapped_args)?;
130
131 match combined_bv {
132 Some(bv) => {
133 let wrapped_cols: Vec<ColumnWithName> = result
134 .names
135 .iter()
136 .zip(result.columns.iter())
137 .map(|(name, data)| {
138 ColumnWithName::new(
139 name.clone(),
140 ColumnBuffer::Option {
141 inner: Box::new(data.clone()),
142 bitvec: bv.clone(),
143 },
144 )
145 })
146 .collect();
147 Ok(Columns::new(wrapped_cols))
148 }
149 None => Ok(result),
150 }
151 }
152}
153
154pub trait Function: for<'a> Routine<context::FunctionContext<'a>> {
158 fn kinds(&self) -> &[FunctionKind];
161
162 fn accumulator(&self, _ctx: &mut context::FunctionContext<'_>) -> Option<Box<dyn Accumulator>> {
165 None
166 }
167}
168
169pub trait Procedure: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
174
175impl<T: ?Sized> Procedure for T where T: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
176
177pub trait Accumulator: Send + Sync {
180 fn update(&mut self, args: &Columns, groups: &GroupByView) -> Result<(), RoutineError>;
181 fn finalize(&mut self) -> Result<(Vec<GroupKey>, ColumnBuffer), RoutineError>;
182}