Skip to main content

reifydb_sub_flow/operator/
map.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7	interface::{
8		catalog::flow::FlowNodeId,
9		change::{Change, Diff},
10	},
11	value::column::{Column, columns::Columns},
12};
13use reifydb_engine::{
14	expression::{
15		compile::{CompiledExpr, compile_expression},
16		context::{CompileContext, EvalContext},
17	},
18	vm::stack::SymbolTable,
19};
20use reifydb_function::registry::Functions;
21use reifydb_rql::expression::Expression;
22use reifydb_runtime::clock::Clock;
23use reifydb_type::{fragment::Fragment, params::Params, value::row_number::RowNumber};
24
25use crate::{Operator, operator::Operators, transaction::FlowTransaction};
26
27// Static empty params instance for use in EvaluationContext
28static EMPTY_PARAMS: Params = Params::None;
29static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
30
31pub struct MapOperator {
32	parent: Arc<Operators>,
33	node: FlowNodeId,
34	expressions: Vec<Expression>,
35	compiled_expressions: Vec<CompiledExpr>,
36	functions: Functions,
37	clock: Clock,
38}
39
40impl MapOperator {
41	pub fn new(
42		parent: Arc<Operators>,
43		node: FlowNodeId,
44		expressions: Vec<Expression>,
45		functions: Functions,
46		clock: Clock,
47	) -> Self {
48		let compile_ctx = CompileContext {
49			functions: &functions,
50			symbol_table: &EMPTY_SYMBOL_TABLE,
51		};
52		let compiled_expressions: Vec<CompiledExpr> = expressions
53			.iter()
54			.map(|e| compile_expression(&compile_ctx, e))
55			.collect::<Result<Vec<_>, _>>()
56			.expect("Failed to compile expressions");
57
58		Self {
59			parent,
60			node,
61			expressions,
62			compiled_expressions,
63			functions,
64			clock,
65		}
66	}
67
68	/// Project all rows in Columns using expressions
69	fn project(&self, columns: &Columns) -> reifydb_type::Result<Columns> {
70		let row_count = columns.row_count();
71		if row_count == 0 {
72			return Ok(Columns::empty());
73		}
74
75		let exec_ctx = EvalContext {
76			target: None,
77			columns: columns.clone(),
78			row_count,
79			take: None,
80			params: &EMPTY_PARAMS,
81			symbol_table: &EMPTY_SYMBOL_TABLE,
82			is_aggregate_context: false,
83			functions: &self.functions,
84			clock: &self.clock,
85			arena: None,
86		};
87
88		let mut result_columns = Vec::with_capacity(self.expressions.len());
89
90		for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
91			let evaluated_col = compiled_expr.execute(&exec_ctx)?;
92
93			let expr = &self.expressions[i];
94			let field_name = match expr {
95				Expression::Alias(alias_expr) => alias_expr.alias.name().to_string(),
96				Expression::Column(col_expr) => col_expr.0.name.text().to_string(),
97				Expression::AccessSource(access_expr) => access_expr.column.name.text().to_string(),
98				_ => expr.full_fragment_owned().text().to_string(),
99			};
100
101			let named_column = Column {
102				name: Fragment::internal(field_name),
103				data: evaluated_col.data().clone(),
104			};
105
106			result_columns.push(named_column);
107		}
108
109		let row_numbers = if columns.row_numbers.is_empty() {
110			Vec::new()
111		} else {
112			columns.row_numbers.iter().cloned().collect()
113		};
114
115		Ok(Columns::with_row_numbers(result_columns, row_numbers))
116	}
117}
118
119impl Operator for MapOperator {
120	fn id(&self) -> FlowNodeId {
121		self.node
122	}
123
124	fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> reifydb_type::Result<Change> {
125		let mut result = Vec::new();
126
127		for diff in change.diffs.into_iter() {
128			match diff {
129				Diff::Insert {
130					post,
131				} => {
132					let projected = match self.project(&post) {
133						Ok(projected) => projected,
134						Err(err) => {
135							panic!("{:#?}", err)
136						}
137					};
138
139					if !projected.is_empty() {
140						result.push(Diff::Insert {
141							post: projected,
142						});
143					}
144				}
145				Diff::Update {
146					pre,
147					post,
148				} => {
149					let projected_post = self.project(&post)?;
150					let projected_pre = self.project(&pre)?;
151
152					if !projected_post.is_empty() {
153						result.push(Diff::Update {
154							pre: projected_pre,
155							post: projected_post,
156						});
157					}
158				}
159				Diff::Remove {
160					pre,
161				} => {
162					let projected_pre = self.project(&pre)?;
163					if !projected_pre.is_empty() {
164						result.push(Diff::Remove {
165							pre: projected_pre,
166						});
167					}
168				}
169			}
170		}
171
172		Ok(Change::from_flow(self.node, change.version, result))
173	}
174
175	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> reifydb_type::Result<Columns> {
176		self.parent.pull(txn, rows)
177	}
178}