Skip to main content

reifydb_sub_flow/operator/
map.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7	interface::{
8		catalog::flow::FlowNodeId,
9		change::{Change, Diff},
10	},
11	value::column::{Column, columns::Columns},
12};
13use reifydb_engine::{
14	expression::{
15		compile::{CompiledExpr, compile_expression},
16		context::{CompileContext, EvalContext},
17	},
18	vm::stack::SymbolTable,
19};
20use reifydb_function::registry::Functions;
21use reifydb_rql::expression::Expression;
22use reifydb_runtime::clock::Clock;
23use reifydb_type::{
24	Result,
25	fragment::Fragment,
26	params::Params,
27	value::{identity::IdentityId, row_number::RowNumber},
28};
29
30use crate::{Operator, operator::Operators, transaction::FlowTransaction};
31
32// Static empty params instance for use in EvaluationContext
33static EMPTY_PARAMS: Params = Params::None;
34static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
35
36pub struct MapOperator {
37	parent: Arc<Operators>,
38	node: FlowNodeId,
39	expressions: Vec<Expression>,
40	compiled_expressions: Vec<CompiledExpr>,
41	functions: Functions,
42	clock: Clock,
43}
44
45impl MapOperator {
46	pub fn new(
47		parent: Arc<Operators>,
48		node: FlowNodeId,
49		expressions: Vec<Expression>,
50		functions: Functions,
51		clock: Clock,
52	) -> Self {
53		let compile_ctx = CompileContext {
54			functions: &functions,
55			symbol_table: &EMPTY_SYMBOL_TABLE,
56		};
57		let compiled_expressions: Vec<CompiledExpr> = expressions
58			.iter()
59			.map(|e| compile_expression(&compile_ctx, e))
60			.collect::<Result<Vec<_>>>()
61			.expect("Failed to compile expressions");
62
63		Self {
64			parent,
65			node,
66			expressions,
67			compiled_expressions,
68			functions,
69			clock,
70		}
71	}
72
73	/// Project all rows in Columns using expressions
74	fn project(&self, columns: &Columns) -> Result<Columns> {
75		let row_count = columns.row_count();
76		if row_count == 0 {
77			return Ok(Columns::empty());
78		}
79
80		let exec_ctx = EvalContext {
81			target: None,
82			columns: columns.clone(),
83			row_count,
84			take: None,
85			params: &EMPTY_PARAMS,
86			symbol_table: &EMPTY_SYMBOL_TABLE,
87			is_aggregate_context: false,
88			functions: &self.functions,
89			clock: &self.clock,
90			arena: None,
91			identity: IdentityId::root(),
92		};
93
94		let mut result_columns = Vec::with_capacity(self.expressions.len());
95
96		for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
97			let evaluated_col = compiled_expr.execute(&exec_ctx)?;
98
99			let expr = &self.expressions[i];
100			let field_name = match expr {
101				Expression::Alias(alias_expr) => alias_expr.alias.name().to_string(),
102				Expression::Column(col_expr) => col_expr.0.name.text().to_string(),
103				Expression::AccessSource(access_expr) => access_expr.column.name.text().to_string(),
104				_ => expr.full_fragment_owned().text().to_string(),
105			};
106
107			let named_column = Column {
108				name: Fragment::internal(field_name),
109				data: evaluated_col.data().clone(),
110			};
111
112			result_columns.push(named_column);
113		}
114
115		let row_numbers = if columns.row_numbers.is_empty() {
116			Vec::new()
117		} else {
118			columns.row_numbers.iter().cloned().collect()
119		};
120
121		Ok(Columns::with_row_numbers(result_columns, row_numbers))
122	}
123}
124
125impl Operator for MapOperator {
126	fn id(&self) -> FlowNodeId {
127		self.node
128	}
129
130	fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
131		let mut result = Vec::new();
132
133		for diff in change.diffs.into_iter() {
134			match diff {
135				Diff::Insert {
136					post,
137				} => {
138					let projected = match self.project(&post) {
139						Ok(projected) => projected,
140						Err(err) => {
141							panic!("{:#?}", err)
142						}
143					};
144
145					if !projected.is_empty() {
146						result.push(Diff::Insert {
147							post: projected,
148						});
149					}
150				}
151				Diff::Update {
152					pre,
153					post,
154				} => {
155					let projected_post = self.project(&post)?;
156					let projected_pre = self.project(&pre)?;
157
158					if !projected_post.is_empty() {
159						result.push(Diff::Update {
160							pre: projected_pre,
161							post: projected_post,
162						});
163					}
164				}
165				Diff::Remove {
166					pre,
167				} => {
168					let projected_pre = self.project(&pre)?;
169					if !projected_pre.is_empty() {
170						result.push(Diff::Remove {
171							pre: projected_pre,
172						});
173					}
174				}
175			}
176		}
177
178		Ok(Change::from_flow(self.node, change.version, result))
179	}
180
181	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
182		self.parent.pull(txn, rows)
183	}
184}