reifydb_routine/routine/
mod.rs1pub mod context;
10pub mod error;
11pub mod registry;
12
13use error::RoutineError;
14use reifydb_core::value::column::{
15 ColumnWithName,
16 buffer::ColumnBuffer,
17 columns::Columns,
18 view::group_by::{GroupByView, GroupKey},
19};
20use reifydb_type::{
21 fragment::Fragment,
22 util::bitvec::BitVec,
23 value::r#type::{Type, input_types::InputTypes},
24};
25use serde::{Deserialize, Serialize};
26
27mod sealed {
28 pub trait Sealed {}
29}
30
31pub trait Context: Send + Sync + sealed::Sealed {}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum FunctionKind {
35 Scalar,
36
37 Aggregate,
38
39 Generator,
40}
41
42#[derive(Debug, Clone)]
43pub struct RoutineInfo {
44 pub name: String,
45 pub description: Option<String>,
46}
47
48impl RoutineInfo {
49 pub fn new(name: &str) -> Self {
50 Self {
51 name: name.to_string(),
52 description: None,
53 }
54 }
55}
56
57pub trait Routine<C: Context>: Send + Sync {
58 fn info(&self) -> &RoutineInfo;
59
60 fn return_type(&self, input_types: &[Type]) -> Type;
61
62 fn accepted_types(&self) -> InputTypes {
63 InputTypes::any()
64 }
65
66 fn propagates_options(&self) -> bool {
67 true
68 }
69
70 fn attaches_row_metadata(&self) -> bool {
71 true
72 }
73
74 fn execute(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError>;
75
76 fn call(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError> {
77 if !self.propagates_options() {
78 return self.execute(ctx, args);
79 }
80
81 let has_option = args.iter().any(|c| matches!(c.data(), ColumnBuffer::Option { .. }));
82 if !has_option {
83 return self.execute(ctx, args);
84 }
85
86 let mut combined_bv: Option<BitVec> = None;
87 let mut unwrapped = Vec::with_capacity(args.len());
88 for col in args.iter() {
89 let (inner, bv) = col.data().unwrap_option();
90 if let Some(bv) = bv {
91 combined_bv = Some(match combined_bv {
92 Some(existing) => existing.and(bv),
93 None => bv.clone(),
94 });
95 }
96 unwrapped.push(ColumnWithName::new(col.name().clone(), inner.clone()));
97 }
98
99 if let Some(ref bv) = combined_bv
100 && bv.count_ones() == 0
101 {
102 let row_count = args.row_count();
103 let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data.get_type()).collect();
104 let result_type = self.return_type(&input_types);
105 let result_data = ColumnBuffer::none_typed(result_type, row_count);
106 return Ok(Columns::new(vec![ColumnWithName::new(
107 Fragment::internal(self.info().name.clone()),
108 result_data,
109 )]));
110 }
111
112 let unwrapped_args = Columns::new(unwrapped);
113 let result = self.execute(ctx, &unwrapped_args)?;
114
115 match combined_bv {
116 Some(bv) => {
117 let wrapped_cols: Vec<ColumnWithName> = result
118 .names
119 .iter()
120 .zip(result.columns.iter())
121 .map(|(name, data)| {
122 ColumnWithName::new(
123 name.clone(),
124 ColumnBuffer::Option {
125 inner: Box::new(data.clone()),
126 bitvec: bv.clone(),
127 },
128 )
129 })
130 .collect();
131 Ok(Columns::new(wrapped_cols))
132 }
133 None => Ok(result),
134 }
135 }
136}
137
138pub trait Function: for<'a> Routine<context::FunctionContext<'a>> {
139 fn kinds(&self) -> &[FunctionKind];
140
141 fn accumulator(&self, _ctx: &mut context::FunctionContext<'_>) -> Option<Box<dyn Accumulator>> {
142 None
143 }
144}
145
146pub trait Procedure: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
147
148impl<T: ?Sized> Procedure for T where T: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
149
150pub trait Accumulator: Send + Sync {
151 fn update(&mut self, args: &Columns, groups: &GroupByView) -> Result<(), RoutineError>;
152 fn finalize(&mut self) -> Result<(Vec<GroupKey>, ColumnBuffer), RoutineError>;
153}