Skip to main content

reifydb_routine/routine/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod context;
5pub mod error;
6pub mod registry;
7
8use error::RoutineError;
9use reifydb_core::value::column::{
10	ColumnWithName,
11	buffer::ColumnBuffer,
12	columns::Columns,
13	view::group_by::{GroupByView, GroupKey},
14};
15use reifydb_type::{
16	fragment::Fragment,
17	util::bitvec::BitVec,
18	value::r#type::{Type, input_types::InputTypes},
19};
20use serde::{Deserialize, Serialize};
21
22mod sealed {
23	pub trait Sealed {}
24}
25
26/// Sealed marker trait for execution contexts. Only `FunctionContext` and
27/// `ProcedureContext` implement it. Sealing prevents third parties from
28/// introducing a third routine flavour and breaking the registry's invariants.
29pub trait Context: Send + Sync + sealed::Sealed {}
30
31/// Function flavour. Lives on the function branch only; procedures don't have
32/// a kind discriminator (being a procedure IS the answer).
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum FunctionKind {
35	/// Vectorised, 1 row in -> 1 row out.
36	Scalar,
37	/// Accumulator-based aggregation across N rows -> 1 row per group.
38	Aggregate,
39	/// Table-valued: 1 row in -> N rows out.
40	Generator,
41}
42
43#[derive(Debug, Clone)]
44pub struct RoutineInfo {
45	pub name: String,
46	pub description: Option<String>,
47}
48
49impl RoutineInfo {
50	pub fn new(name: &str) -> Self {
51		Self {
52			name: name.to_string(),
53			description: None,
54		}
55	}
56}
57
58/// The generic, function-and-procedure-agnostic contract. Implementors pick a
59/// context type (`FunctionContext` or `ProcedureContext`); that choice IS the
60/// function-vs-procedure declaration and statically determines whether the
61/// routine can access the transaction.
62///
63/// Function-only concerns (`kinds`, `accumulator`) live on the `Function`
64/// sub-trait. Procedures get a marker sub-trait `Procedure` with a blanket
65/// impl, so existing procedure impls require no extra boilerplate.
66pub trait Routine<C: Context>: Send + Sync {
67	fn info(&self) -> &RoutineInfo;
68
69	fn return_type(&self, input_types: &[Type]) -> Type;
70
71	fn accepted_types(&self) -> InputTypes {
72		InputTypes::any()
73	}
74
75	fn propagates_options(&self) -> bool {
76		true
77	}
78
79	/// Execute the routine.
80	///
81	/// Takes `ctx` by `&mut` so procedure routines can reborrow
82	/// `ctx.tx` as `&mut Transaction`. Function routines don't mutate the
83	/// context  - the `&mut` is a no-op for them, since the env fields are
84	/// shared references whose mutability isn't projected through.
85	fn execute(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError>;
86
87	/// Calls the routine, automatically propagating Option columns if
88	/// `propagates_options()` returns true. The option-propagation behaviour
89	/// is identical for both contexts, hence the shared default.
90	fn call(&self, ctx: &mut C, args: &Columns) -> Result<Columns, RoutineError> {
91		if !self.propagates_options() {
92			return self.execute(ctx, args);
93		}
94
95		let has_option = args.iter().any(|c| matches!(c.data(), ColumnBuffer::Option { .. }));
96		if !has_option {
97			return self.execute(ctx, args);
98		}
99
100		let mut combined_bv: Option<BitVec> = None;
101		let mut unwrapped = Vec::with_capacity(args.len());
102		for col in args.iter() {
103			let (inner, bv) = col.data().unwrap_option();
104			if let Some(bv) = bv {
105				combined_bv = Some(match combined_bv {
106					Some(existing) => existing.and(bv),
107					None => bv.clone(),
108				});
109			}
110			unwrapped.push(ColumnWithName::new(col.name().clone(), inner.clone()));
111		}
112
113		// Short-circuit: when all combined values are None, skip the inner routine
114		// call entirely to avoid type-validation errors on placeholder inner types.
115		if let Some(ref bv) = combined_bv
116			&& bv.count_ones() == 0
117		{
118			let row_count = args.row_count();
119			let input_types: Vec<Type> = unwrapped.iter().map(|c| c.data.get_type()).collect();
120			let result_type = self.return_type(&input_types);
121			let result_data = ColumnBuffer::none_typed(result_type, row_count);
122			return Ok(Columns::new(vec![ColumnWithName::new(
123				Fragment::internal(self.info().name.clone()),
124				result_data,
125			)]));
126		}
127
128		let unwrapped_args = Columns::new(unwrapped);
129		let result = self.execute(ctx, &unwrapped_args)?;
130
131		match combined_bv {
132			Some(bv) => {
133				let wrapped_cols: Vec<ColumnWithName> = result
134					.names
135					.iter()
136					.zip(result.columns.iter())
137					.map(|(name, data)| {
138						ColumnWithName::new(
139							name.clone(),
140							ColumnBuffer::Option {
141								inner: Box::new(data.clone()),
142								bitvec: bv.clone(),
143							},
144						)
145					})
146					.collect();
147				Ok(Columns::new(wrapped_cols))
148			}
149			None => Ok(result),
150		}
151	}
152}
153
154/// Function-specific extension of `Routine`. Carries the kind discriminator
155/// and the optional aggregate accumulator factory. Procedures do not see these
156/// methods.
157pub trait Function: for<'a> Routine<context::FunctionContext<'a>> {
158	/// The execution shapes this function supports (Scalar, Aggregate,
159	/// Generator). Required: every function declares at least one kind.
160	fn kinds(&self) -> &[FunctionKind];
161
162	/// Aggregate accumulator factory. Only functions whose `kinds()` includes
163	/// `FunctionKind::Aggregate` need to override this.
164	fn accumulator(&self, _ctx: &mut context::FunctionContext<'_>) -> Option<Box<dyn Accumulator>> {
165		None
166	}
167}
168
169/// Procedure marker. Empty: every implementor of
170/// `Routine<ProcedureContext<'_, '_>>` is automatically a `Procedure` via the
171/// blanket impl below. Exists so `dyn Procedure` is a real type and so
172/// procedure-only methods have an obvious home if we add any later.
173pub trait Procedure: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
174
175impl<T: ?Sized> Procedure for T where T: for<'a, 'tx> Routine<context::ProcedureContext<'a, 'tx>> {}
176
177/// Aggregate accumulator. Stateful per-group reducer that consumes column
178/// batches via `update` and produces final group results via `finalize`.
179pub trait Accumulator: Send + Sync {
180	fn update(&mut self, args: &Columns, groups: &GroupByView) -> Result<(), RoutineError>;
181	fn finalize(&mut self) -> Result<(Vec<GroupKey>, ColumnBuffer), RoutineError>;
182}