reifydb_engine/vm/volcano/
generator.rs1use std::sync::Arc;
5
6use reifydb_core::value::column::{columns::Columns, headers::ColumnHeaders};
7use reifydb_routine::routine::{
8 Function, Procedure,
9 context::{FunctionContext, ProcedureContext},
10};
11use reifydb_rql::expression::Expression;
12use reifydb_transaction::transaction::Transaction;
13use reifydb_type::{fragment::Fragment, params::Params, value::Value};
14
15use crate::{
16 Result,
17 error::EngineError,
18 expression::{context::EvalContext, eval::evaluate},
19 vm::volcano::query::{QueryContext, QueryNode},
20};
21
22enum GeneratorImpl {
23 Function(Arc<dyn Function>),
24 Procedure(Arc<dyn Procedure>),
25}
26
27pub(crate) struct GeneratorNode {
28 function_name: Fragment,
29 expressions: Vec<Expression>,
30 context: Option<Arc<QueryContext>>,
31 exhausted: bool,
32 generator: Option<GeneratorImpl>,
33}
34
35impl GeneratorNode {
36 pub fn new(function_name: Fragment, parameter_expressions: Vec<Expression>) -> Self {
37 Self {
38 function_name,
39 expressions: parameter_expressions,
40 context: None,
41 exhausted: false,
42 generator: None,
43 }
44 }
45}
46
47impl QueryNode for GeneratorNode {
48 fn initialize<'a>(&mut self, _txn: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
49 self.context = Some(Arc::new(ctx.clone()));
50
51 let name = self.function_name.text();
52 if let Some(func) = ctx.services.routines.get_generator_function(name) {
53 self.generator = Some(GeneratorImpl::Function(func));
54 } else if let Some(proc) = ctx.services.routines.get_procedure(name) {
55 self.generator = Some(GeneratorImpl::Procedure(proc));
56 } else {
57 return Err(EngineError::GeneratorNotFound {
58 name: name.to_string(),
59 fragment: self.function_name.clone(),
60 }
61 .into());
62 }
63
64 self.exhausted = false;
65 Ok(())
66 }
67
68 fn next<'a>(&mut self, txn: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
69 if self.exhausted {
70 return Ok(None);
71 }
72
73 let stored_ctx = self.context.as_ref().unwrap();
74
75 let session = EvalContext::from_query(stored_ctx);
76 let evaluation_ctx = session.with_eval_empty();
77
78 let mut evaluated_columns = Vec::new();
80 for expr in &self.expressions {
81 let column = evaluate(&evaluation_ctx, expr)?;
82 evaluated_columns.push(column);
83 }
84
85 let columns = match self.generator.as_ref().unwrap() {
86 GeneratorImpl::Function(generator) => {
87 let evaluated_params = Columns::new(evaluated_columns);
88 let mut fn_ctx = FunctionContext {
89 fragment: self.function_name.clone(),
90 identity: stored_ctx.identity,
91 row_count: evaluated_params.row_count(),
92 runtime_context: &stored_ctx.services.runtime_context,
93 };
94 generator.call(&mut fn_ctx, &evaluated_params)?
95 }
96 GeneratorImpl::Procedure(procedure) => {
97 let values: Vec<Value> =
98 evaluated_columns.iter().map(|col| col.data().get_value(0)).collect();
99 let params = Params::Positional(Arc::new(values));
100 let mut proc_ctx = ProcedureContext {
101 fragment: self.function_name.clone(),
102 identity: stored_ctx.identity,
103 row_count: 1,
104 runtime_context: &stored_ctx.services.runtime_context,
105 tx: txn,
106 params: ¶ms,
107 catalog: &stored_ctx.services.catalog,
108 ioc: &stored_ctx.services.ioc,
109 };
110 let empty = Columns::empty();
111 procedure.call(&mut proc_ctx, &empty)?
112 }
113 };
114
115 self.exhausted = true;
116
117 Ok(Some(columns))
118 }
119
120 fn headers(&self) -> Option<ColumnHeaders> {
121 None
122 }
123}