Skip to main content

reifydb_sub_flow/operator/
map.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7	interface::{
8		catalog::flow::FlowNodeId,
9		change::{Change, Diff},
10	},
11	value::column::{Column, columns::Columns},
12};
13use reifydb_engine::{
14	expression::{
15		compile::{CompiledExpr, compile_expression},
16		context::{CompileContext, EvalSession},
17	},
18	vm::stack::SymbolTable,
19};
20use reifydb_routine::function::registry::Functions;
21use reifydb_rql::expression::Expression;
22use reifydb_runtime::context::RuntimeContext;
23use reifydb_type::{
24	Result,
25	fragment::Fragment,
26	params::Params,
27	value::{identity::IdentityId, row_number::RowNumber},
28};
29
30use crate::{Operator, operator::Operators, transaction::FlowTransaction};
31
32// Static empty params instance for use in EvaluationContext
33static EMPTY_PARAMS: Params = Params::None;
34static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
35
36pub struct MapOperator {
37	parent: Arc<Operators>,
38	node: FlowNodeId,
39	expressions: Vec<Expression>,
40	compiled_expressions: Vec<CompiledExpr>,
41	functions: Functions,
42	runtime_context: RuntimeContext,
43}
44
45impl MapOperator {
46	pub fn new(
47		parent: Arc<Operators>,
48		node: FlowNodeId,
49		expressions: Vec<Expression>,
50		functions: Functions,
51		runtime_context: RuntimeContext,
52	) -> Self {
53		let compile_ctx = CompileContext {
54			functions: &functions,
55			symbols: &EMPTY_SYMBOL_TABLE,
56		};
57		let compiled_expressions: Vec<CompiledExpr> = expressions
58			.iter()
59			.map(|e| compile_expression(&compile_ctx, e))
60			.collect::<Result<Vec<_>>>()
61			.expect("Failed to compile expressions");
62
63		Self {
64			parent,
65			node,
66			expressions,
67			compiled_expressions,
68			functions,
69			runtime_context,
70		}
71	}
72
73	/// Project all rows in Columns using expressions
74	fn project(&self, columns: &Columns) -> Result<Columns> {
75		let row_count = columns.row_count();
76		if row_count == 0 {
77			return Ok(Columns::empty());
78		}
79
80		let session = EvalSession {
81			params: &EMPTY_PARAMS,
82			symbols: &EMPTY_SYMBOL_TABLE,
83			functions: &self.functions,
84			runtime_context: &self.runtime_context,
85			arena: None,
86			identity: IdentityId::root(),
87			is_aggregate_context: false,
88		};
89		let exec_ctx = session.eval(columns.clone(), row_count);
90
91		let mut result_columns = Vec::with_capacity(self.expressions.len());
92
93		for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
94			let evaluated_col = compiled_expr.execute(&exec_ctx)?;
95
96			let expr = &self.expressions[i];
97			let field_name = match expr {
98				Expression::Alias(alias_expr) => alias_expr.alias.name().to_string(),
99				Expression::Column(col_expr) => col_expr.0.name.text().to_string(),
100				Expression::AccessSource(access_expr) => access_expr.column.name.text().to_string(),
101				_ => expr.full_fragment_owned().text().to_string(),
102			};
103
104			let named_column = Column {
105				name: Fragment::internal(field_name),
106				data: evaluated_col.data().clone(),
107			};
108
109			result_columns.push(named_column);
110		}
111
112		let row_numbers = if columns.row_numbers.is_empty() {
113			Vec::new()
114		} else {
115			columns.row_numbers.iter().cloned().collect()
116		};
117
118		Ok(Columns::with_row_numbers(result_columns, row_numbers))
119	}
120}
121
122impl Operator for MapOperator {
123	fn id(&self) -> FlowNodeId {
124		self.node
125	}
126
127	fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
128		let mut result = Vec::new();
129
130		for diff in change.diffs.into_iter() {
131			match diff {
132				Diff::Insert {
133					post,
134				} => {
135					let projected = match self.project(&post) {
136						Ok(projected) => projected,
137						Err(err) => {
138							panic!("{:#?}", err)
139						}
140					};
141
142					if !projected.is_empty() {
143						result.push(Diff::Insert {
144							post: projected,
145						});
146					}
147				}
148				Diff::Update {
149					pre,
150					post,
151				} => {
152					let projected_post = self.project(&post)?;
153					let projected_pre = self.project(&pre)?;
154
155					if !projected_post.is_empty() {
156						result.push(Diff::Update {
157							pre: projected_pre,
158							post: projected_post,
159						});
160					}
161				}
162				Diff::Remove {
163					pre,
164				} => {
165					let projected_pre = self.project(&pre)?;
166					if !projected_pre.is_empty() {
167						result.push(Diff::Remove {
168							pre: projected_pre,
169						});
170					}
171				}
172			}
173		}
174
175		Ok(Change::from_flow(self.node, change.version, result))
176	}
177
178	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
179		self.parent.pull(txn, rows)
180	}
181}