Skip to main content

reifydb_routine/routine/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Common machinery shared by both functions and procedures: the routine registry the engine consults at
5//! invocation time, the per-call context that exposes engine services to a routine body, and the typed errors a
6//! routine can return. The split between function and procedure semantics happens above this module; everything
7//! here is name resolution, argument binding, and result shaping that both kinds need.
8
9pub mod context;
10pub mod error;
11pub mod registry;
12
13use error::RoutineError;
14use reifydb_core::value::column::{
15	ColumnWithName,
16	buffer::ColumnBuffer,
17	columns::Columns,
18	view::group_by::{GroupByView, GroupKey},
19};
20use reifydb_type::{
21	fragment::Fragment,
22	util::bitvec::BitVec,
23	value::r#type::{Type, input_types::InputTypes},
24};
25use serde::{Deserialize, Serialize};
26
27mod sealed {
28	pub trait Sealed {}
29}
30
31pub trait Context: Send + Sync + sealed::Sealed {}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum FunctionKind {
35	Scalar,
36
37	Aggregate,
38
39	Generator,
40}
41
42#[derive(Debug, Clone)]
43pub struct RoutineInfo {
44	pub name: String,
45	pub description: Option<String>,
46}
47
48impl RoutineInfo {
49	pub fn new(name: &str) -> Self {
50		Self {
51			name: name.to_string(),
52			description: None,
53		}
54	}
55}
56
57pub trait Routine<C: Context>: Send + Sync {
58	fn info(&self) -> &RoutineInfo;
59
60	fn return_type(&self, input_types: &[Type]) -> Type;
61
62	fn accepted_types(&self) -> InputTypes {
63		InputTypes::any()
64	}
65
66	fn propagates_options(&self) -> bool {
67		true
68	}
69
70	fn attaches_row_metadata(&self) -> bool {
71		true
72	}
73
74	fn execute(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError>;
75
76	fn call(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError> {
77		if !self.propagates_options() {
78			return self.execute(ctx, args);
79		}
80
81		let has_option = args.iter().any(|c| matches!(c.data(), ColumnBuffer::Option { .. }));
82		if !has_option {
83			return self.execute(ctx, args);
84		}
85
86		let mut combined_bv: Option<BitVec> = None;
87		let mut unwrapped = Vec::with_capacity(args.len());
88		for col in args.iter() {
89			let (inner, bv) = col.data().unwrap_option();
90			if let Some(bv) = bv {
91				combined_bv = Some(match combined_bv {
92					Some(existing) => existing.and(bv),
93					None => bv.clone(),
94				});
95			}
96			unwrapped.push(ColumnWithName::new(col.name().clone(), inner.clone()));
97		}
98
99		if let Some(ref bv) = combined_bv
100			&& bv.count_ones() == 0
101		{
102			let row_count = args.row_count();
103			let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data.get_type()).collect();
104			let result_type = self.return_type(&input_types);
105			let result_data = ColumnBuffer::none_typed(result_type, row_count);
106			return Ok(Columns::new(vec![ColumnWithName::new(
107				Fragment::internal(self.info().name.clone()),
108				result_data,
109			)]));
110		}
111
112		let unwrapped_args = Columns::new(unwrapped);
113		let result = self.execute(ctx, &unwrapped_args)?;
114
115		match combined_bv {
116			Some(bv) => {
117				let wrapped_cols: Vec<ColumnWithName> = result
118					.names
119					.iter()
120					.zip(result.columns.iter())
121					.map(|(name, data)| {
122						ColumnWithName::new(
123							name.clone(),
124							ColumnBuffer::Option {
125								inner: Box::new(data.clone()),
126								bitvec: bv.clone(),
127							},
128						)
129					})
130					.collect();
131				Ok(Columns::new(wrapped_cols))
132			}
133			None => Ok(result),
134		}
135	}
136}
137
138pub trait Function: for<'a> Routine<context::FunctionContext<'a>> {
139	fn kinds(&self) -> &[FunctionKind];
140
141	fn accumulator(&self, _ctx: &mut context::FunctionContext<'_>) -> Option<Box<dyn Accumulator>> {
142		None
143	}
144}
145
146pub trait Procedure: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
147
148impl<T: ?Sized> Procedure for T where T: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
149
150pub trait Accumulator: Send + Sync {
151	fn update(&mut self, args: &Columns, groups: &GroupByView) -> Result<(), RoutineError>;
152	fn finalize(&mut self) -> Result<(Vec<GroupKey>, ColumnBuffer), RoutineError>;
153}