use std::sync::Arc;
use reifydb_core::value::column::{columns::Columns, headers::ColumnHeaders};
use reifydb_routine::{
function::{Function, FunctionContext},
procedure::{Procedure, context::ProcedureContext},
};
use reifydb_rql::expression::Expression;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{fragment::Fragment, params::Params, value::Value};
use crate::{
Result,
error::EngineError,
expression::{context::EvalContext, eval::evaluate},
vm::volcano::query::{QueryContext, QueryNode},
};
enum GeneratorImpl {
Function(Arc<dyn Function>),
Procedure(Box<dyn Procedure>),
}
pub(crate) struct GeneratorNode {
function_name: Fragment,
expressions: Vec<Expression>,
context: Option<Arc<QueryContext>>,
exhausted: bool,
generator: Option<GeneratorImpl>,
}
impl GeneratorNode {
pub fn new(function_name: Fragment, parameter_expressions: Vec<Expression>) -> Self {
Self {
function_name,
expressions: parameter_expressions,
context: None,
exhausted: false,
generator: None,
}
}
}
impl QueryNode for GeneratorNode {
fn initialize<'a>(&mut self, _txn: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
self.context = Some(Arc::new(ctx.clone()));
let name = self.function_name.text();
if let Some(func) = ctx.services.functions.get_generator(name) {
self.generator = Some(GeneratorImpl::Function(func));
} else if let Some(proc) = ctx.services.procedures.get_procedure(name) {
self.generator = Some(GeneratorImpl::Procedure(proc));
} else {
return Err(EngineError::GeneratorNotFound {
name: name.to_string(),
fragment: self.function_name.clone(),
}
.into());
}
self.exhausted = false;
Ok(())
}
fn next<'a>(&mut self, txn: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
if self.exhausted {
return Ok(None);
}
let stored_ctx = self.context.as_ref().unwrap();
let session = EvalContext::from_query(stored_ctx);
let evaluation_ctx = session.with_eval_empty();
let mut evaluated_columns = Vec::new();
for expr in &self.expressions {
let column = evaluate(&evaluation_ctx, expr)?;
evaluated_columns.push(column);
}
let columns = match self.generator.as_ref().unwrap() {
GeneratorImpl::Function(generator) => {
let evaluated_params = Columns::new(evaluated_columns);
let fn_ctx = FunctionContext::new(
self.function_name.clone(),
&stored_ctx.services.runtime_context,
stored_ctx.identity,
evaluated_params.row_count(),
);
generator.call(&fn_ctx, &evaluated_params)?
}
GeneratorImpl::Procedure(procedure) => {
let values: Vec<Value> =
evaluated_columns.iter().map(|col| col.data().get_value(0)).collect();
let params = Params::Positional(Arc::new(values));
let proc_ctx = ProcedureContext {
params: ¶ms,
catalog: &stored_ctx.services.catalog,
functions: &stored_ctx.services.functions,
runtime_context: &stored_ctx.services.runtime_context,
ioc: &stored_ctx.services.ioc,
};
procedure.call(&proc_ctx, txn)?
}
};
self.exhausted = true;
Ok(Some(columns))
}
fn headers(&self) -> Option<ColumnHeaders> {
None
}
}