Skip to main content

reifydb_engine/vm/volcano/
generator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::value::column::{columns::Columns, headers::ColumnHeaders};
7use reifydb_routine::routine::{
8	Function, Procedure,
9	context::{FunctionContext, ProcedureContext},
10};
11use reifydb_rql::expression::Expression;
12use reifydb_transaction::transaction::Transaction;
13use reifydb_type::{fragment::Fragment, params::Params, value::Value};
14
15use crate::{
16	Result,
17	error::EngineError,
18	expression::{context::EvalContext, eval::evaluate},
19	vm::volcano::query::{QueryContext, QueryNode},
20};
21
22enum GeneratorImpl {
23	Function(Arc<dyn Function>),
24	Procedure(Arc<dyn Procedure>),
25}
26
27pub(crate) struct GeneratorNode {
28	function_name: Fragment,
29	expressions: Vec<Expression>,
30	context: Option<Arc<QueryContext>>,
31	exhausted: bool,
32	generator: Option<GeneratorImpl>,
33}
34
35impl GeneratorNode {
36	pub fn new(function_name: Fragment, parameter_expressions: Vec<Expression>) -> Self {
37		Self {
38			function_name,
39			expressions: parameter_expressions,
40			context: None,
41			exhausted: false,
42			generator: None,
43		}
44	}
45}
46
47impl QueryNode for GeneratorNode {
48	fn initialize<'a>(&mut self, _txn: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
49		self.context = Some(Arc::new(ctx.clone()));
50
51		let name = self.function_name.text();
52		if let Some(func) = ctx.services.routines.get_generator_function(name) {
53			self.generator = Some(GeneratorImpl::Function(func));
54		} else if let Some(proc) = ctx.services.routines.get_procedure(name) {
55			self.generator = Some(GeneratorImpl::Procedure(proc));
56		} else {
57			return Err(EngineError::GeneratorNotFound {
58				name: name.to_string(),
59				fragment: self.function_name.clone(),
60			}
61			.into());
62		}
63
64		self.exhausted = false;
65		Ok(())
66	}
67
68	fn next<'a>(&mut self, txn: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
69		if self.exhausted {
70			return Ok(None);
71		}
72
73		let stored_ctx = self.context.as_ref().unwrap();
74
75		let session = EvalContext::from_query(stored_ctx);
76		let evaluation_ctx = session.with_eval_empty();
77
78		let mut evaluated_columns = Vec::new();
79		for expr in &self.expressions {
80			let column = evaluate(&evaluation_ctx, expr)?;
81			evaluated_columns.push(column);
82		}
83
84		let columns = match self.generator.as_ref().unwrap() {
85			GeneratorImpl::Function(generator) => {
86				let evaluated_params = Columns::new(evaluated_columns);
87				let mut fn_ctx = FunctionContext {
88					fragment: self.function_name.clone(),
89					identity: stored_ctx.identity,
90					row_count: evaluated_params.row_count(),
91					runtime_context: &stored_ctx.services.runtime_context,
92				};
93				generator.call(&mut fn_ctx, &evaluated_params)?
94			}
95			GeneratorImpl::Procedure(procedure) => {
96				let values: Vec<Value> =
97					evaluated_columns.iter().map(|col| col.data().get_value(0)).collect();
98				let params = Params::Positional(Arc::new(values));
99				let mut proc_ctx = ProcedureContext {
100					fragment: self.function_name.clone(),
101					identity: stored_ctx.identity,
102					row_count: 1,
103					runtime_context: &stored_ctx.services.runtime_context,
104					tx: txn,
105					params: &params,
106					catalog: &stored_ctx.services.catalog,
107					ioc: &stored_ctx.services.ioc,
108				};
109				let empty = Columns::empty();
110				procedure.call(&mut proc_ctx, &empty)?
111			}
112		};
113
114		self.exhausted = true;
115
116		Ok(Some(columns))
117	}
118
119	fn headers(&self) -> Option<ColumnHeaders> {
120		None
121	}
122}