Skip to main content

reifydb_engine/vm/volcano/
map.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{mem, sync::Arc};
5
6use reifydb_core::{
7	interface::{evaluate::TargetColumn, resolved::ResolvedColumn},
8	value::column::{ColumnWithName, columns::Columns, headers::ColumnHeaders},
9};
10use reifydb_extension::transform::{Transform, context::TransformContext};
11use reifydb_rql::expression::{Expression, name::display_label};
12use reifydb_transaction::transaction::Transaction;
13use reifydb_type::{fragment::Fragment, util::cowvec::CowVec};
14use tracing::instrument;
15
16use super::NoopNode;
17use crate::{
18	Result,
19	expression::{
20		cast::cast_column_data,
21		compile::{CompiledExpr, compile_expression},
22		context::{CompileContext, EvalContext},
23	},
24	vm::volcano::{
25		query::{QueryContext, QueryNode},
26		udf::{UdfEvalNode, evaluate_udfs_no_input, strip_udf_columns},
27	},
28};
29
30pub(crate) struct MapNode {
31	input: Box<dyn QueryNode>,
32	expressions: Vec<Expression>,
33	udf_names: Vec<String>,
34	headers: Option<ColumnHeaders>,
35	context: Option<(Arc<QueryContext>, Vec<CompiledExpr>)>,
36}
37
38impl MapNode {
39	pub fn new(input: Box<dyn QueryNode>, expressions: Vec<Expression>) -> Self {
40		Self {
41			input,
42			expressions,
43			udf_names: Vec::new(),
44			headers: None,
45			context: None,
46		}
47	}
48}
49
50impl QueryNode for MapNode {
51	#[instrument(name = "volcano::map::initialize", level = "trace", skip_all)]
52	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
53		let (input, expressions, udf_names) = UdfEvalNode::wrap_if_needed(
54			mem::replace(&mut self.input, Box::new(NoopNode)),
55			&self.expressions,
56			&ctx.symbols,
57		);
58		self.input = input;
59		self.expressions = expressions;
60		self.udf_names = udf_names;
61
62		let compile_ctx = CompileContext {
63			symbols: &ctx.symbols,
64		};
65		let compiled = self
66			.expressions
67			.iter()
68			.map(|e| compile_expression(&compile_ctx, e).expect("compile"))
69			.collect();
70		self.context = Some((Arc::new(ctx.clone()), compiled));
71		let column_names = self.expressions.iter().map(display_label).collect();
72		self.headers = Some(ColumnHeaders {
73			columns: column_names,
74		});
75		self.input.initialize(rx, ctx)?;
76		Ok(())
77	}
78
79	#[instrument(name = "volcano::map::next", level = "trace", skip_all)]
80	fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
81		debug_assert!(self.context.is_some(), "MapNode::next() called before initialize()");
82
83		if let Some(columns) = self.input.next(rx, ctx)? {
84			let stored_ctx = &self.context.as_ref().unwrap().0;
85			let transform_ctx = TransformContext {
86				routines: &ctx.services.routines,
87				runtime_context: &stored_ctx.services.runtime_context,
88				params: &stored_ctx.params,
89			};
90			let mut result = self.apply(&transform_ctx, columns)?;
91			strip_udf_columns(&mut result, &self.udf_names);
92
93			Ok(Some(result))
94		} else {
95			Ok(None)
96		}
97	}
98
99	fn headers(&self) -> Option<ColumnHeaders> {
100		self.headers.clone().or(self.input.headers())
101	}
102}
103
104impl Transform for MapNode {
105	fn apply(&self, ctx: &TransformContext, input: Columns) -> Result<Columns> {
106		let (stored_ctx, compiled) =
107			self.context.as_ref().expect("MapNode::apply() called before initialize()");
108
109		let row_count = input.row_count();
110		let session = EvalContext::from_transform(ctx, stored_ctx);
111		let mut new_columns = Vec::with_capacity(compiled.len());
112
113		for (expr, compiled_expr) in self.expressions.iter().zip(compiled.iter()) {
114			let mut exec_ctx = session.with_eval(input.clone(), row_count);
115
116			if let (Expression::Alias(alias_expr), Some(source)) = (expr, &stored_ctx.source) {
117				let alias_name = alias_expr.alias.name();
118				if let Some(table_column) = source.columns().iter().find(|col| col.name == alias_name) {
119					let column_ident = Fragment::internal(&table_column.name);
120					let resolved_column =
121						ResolvedColumn::new(column_ident, source.clone(), table_column.clone());
122					exec_ctx.target = Some(TargetColumn::Resolved(resolved_column));
123				}
124			}
125
126			let mut column = compiled_expr.execute(&exec_ctx)?;
127
128			if let Some(target_type) = exec_ctx.target.as_ref().map(|t| t.column_type())
129				&& column.data.get_type() != target_type
130			{
131				let data =
132					cast_column_data(&exec_ctx, &column.data, target_type, &expr.lazy_fragment())?;
133				column = ColumnWithName {
134					name: column.name,
135					data,
136				};
137			}
138
139			new_columns.push(column);
140		}
141
142		let mut names_vec = Vec::with_capacity(new_columns.len());
143		let mut buffers_vec = Vec::with_capacity(new_columns.len());
144		for c in new_columns {
145			names_vec.push(c.name);
146			buffers_vec.push(c.data);
147		}
148		Ok(Columns {
149			row_numbers: input.row_numbers,
150			created_at: input.created_at,
151			updated_at: input.updated_at,
152			columns: CowVec::new(buffers_vec),
153			names: CowVec::new(names_vec),
154		})
155	}
156}
157
158pub(crate) struct MapWithoutInputNode {
159	expressions: Vec<Expression>,
160	headers: Option<ColumnHeaders>,
161	/// When UDFs are present, stores the rewritten expressions and pre-computed UDF result columns.
162	udf_columns: Option<Columns>,
163	context: Option<(Arc<QueryContext>, Vec<CompiledExpr>)>,
164}
165
166impl MapWithoutInputNode {
167	pub fn new(expressions: Vec<Expression>) -> Self {
168		Self {
169			expressions,
170			headers: None,
171			udf_columns: None,
172			context: None,
173		}
174	}
175}
176
177impl QueryNode for MapWithoutInputNode {
178	#[instrument(name = "volcano::map::noinput::initialize", level = "trace", skip_all)]
179	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
180		// Extract and evaluate UDFs if present
181		if let Some((rewritten, udf_cols)) = evaluate_udfs_no_input(&self.expressions, ctx, rx)? {
182			self.expressions = rewritten;
183			self.udf_columns = Some(udf_cols);
184		}
185
186		let compile_ctx = CompileContext {
187			symbols: &ctx.symbols,
188		};
189		let compiled = self
190			.expressions
191			.iter()
192			.map(|e| compile_expression(&compile_ctx, e).expect("compile"))
193			.collect();
194		self.context = Some((Arc::new(ctx.clone()), compiled));
195		Ok(())
196	}
197
198	#[instrument(name = "volcano::map::noinput::next", level = "trace", skip_all)]
199	fn next<'a>(&mut self, _rx: &mut Transaction<'a>, _ctx: &mut QueryContext) -> Result<Option<Columns>> {
200		debug_assert!(self.context.is_some(), "MapWithoutInputNode::next() called before initialize()");
201		let (stored_ctx, compiled) = self.context.as_ref().unwrap();
202
203		if self.headers.is_some() {
204			return Ok(None);
205		}
206
207		let session = EvalContext::from_query(stored_ctx);
208		let mut columns = vec![];
209
210		for compiled_expr in compiled {
211			// If we have UDF result columns, include them so __udf_N column refs resolve
212			let exec_ctx = match &self.udf_columns {
213				Some(udf_cols) => session.with_eval(udf_cols.clone(), 1),
214				None => session.with_eval_empty(),
215			};
216
217			let column = compiled_expr.execute(&exec_ctx)?;
218
219			columns.push(column);
220		}
221
222		let columns = Columns::new(columns);
223		self.headers = Some(ColumnHeaders::from_columns(&columns));
224		Ok(Some(columns))
225	}
226
227	fn headers(&self) -> Option<ColumnHeaders> {
228		self.headers.clone()
229	}
230}