Skip to main content

reifydb_sub_flow/operator/
map.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7	interface::{
8		catalog::flow::FlowNodeId,
9		change::{Change, Diff},
10	},
11	value::column::{ColumnWithName, columns::Columns},
12};
13use reifydb_engine::{
14	expression::{
15		compile::{CompiledExpr, compile_expression},
16		context::{CompileContext, EvalContext},
17	},
18	vm::stack::SymbolTable,
19};
20use reifydb_routine::routine::registry::Routines;
21use reifydb_rql::expression::{Expression, name::display_label};
22use reifydb_runtime::context::RuntimeContext;
23use reifydb_type::{
24	Result,
25	fragment::Fragment,
26	params::Params,
27	value::{identity::IdentityId, row_number::RowNumber},
28};
29
30use crate::{Operator, operator::Operators, transaction::FlowTransaction};
31
32// Static empty params instance for use in EvaluationContext
33static EMPTY_PARAMS: Params = Params::None;
34static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
35
36pub struct MapOperator {
37	parent: Arc<Operators>,
38	node: FlowNodeId,
39	expressions: Vec<Expression>,
40	compiled_expressions: Vec<CompiledExpr>,
41	routines: Routines,
42	runtime_context: RuntimeContext,
43}
44
45impl MapOperator {
46	pub fn new(
47		parent: Arc<Operators>,
48		node: FlowNodeId,
49		expressions: Vec<Expression>,
50		routines: Routines,
51		runtime_context: RuntimeContext,
52	) -> Self {
53		let compile_ctx = CompileContext {
54			symbols: &EMPTY_SYMBOL_TABLE,
55		};
56		let compiled_expressions: Vec<CompiledExpr> = expressions
57			.iter()
58			.map(|e| compile_expression(&compile_ctx, e))
59			.collect::<Result<Vec<_>>>()
60			.expect("Failed to compile expressions");
61
62		Self {
63			parent,
64			node,
65			expressions,
66			compiled_expressions,
67			routines,
68			runtime_context,
69		}
70	}
71
72	/// Project all rows in Columns using expressions
73	fn project(&self, columns: &Columns) -> Result<Columns> {
74		let row_count = columns.row_count();
75		if row_count == 0 {
76			return Ok(Columns::empty());
77		}
78
79		let session = EvalContext {
80			params: &EMPTY_PARAMS,
81			symbols: &EMPTY_SYMBOL_TABLE,
82			routines: &self.routines,
83			runtime_context: &self.runtime_context,
84			arena: None,
85			identity: IdentityId::root(),
86			is_aggregate_context: false,
87			columns: Columns::empty(),
88			row_count: 1,
89			target: None,
90			take: None,
91		};
92		let exec_ctx = session.with_eval(columns.clone(), row_count);
93
94		let mut result_columns = Vec::with_capacity(self.expressions.len());
95
96		for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
97			let evaluated_col = compiled_expr.execute(&exec_ctx)?;
98
99			let expr = &self.expressions[i];
100			let field_name = display_label(expr).text().to_string();
101
102			let named_column =
103				ColumnWithName::new(Fragment::internal(field_name), evaluated_col.data().clone());
104
105			result_columns.push(named_column);
106		}
107
108		let row_numbers = if columns.row_numbers.is_empty() {
109			Vec::new()
110		} else {
111			columns.row_numbers.iter().cloned().collect()
112		};
113
114		Ok(Columns::with_system_columns(
115			result_columns,
116			row_numbers,
117			columns.created_at.to_vec(),
118			columns.updated_at.to_vec(),
119		))
120	}
121}
122
123impl Operator for MapOperator {
124	fn id(&self) -> FlowNodeId {
125		self.node
126	}
127
128	fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
129		let mut result = Vec::new();
130
131		for diff in change.diffs.into_iter() {
132			match diff {
133				Diff::Insert {
134					post,
135				} => {
136					let projected = match self.project(&post) {
137						Ok(projected) => projected,
138						Err(err) => {
139							panic!("{:#?}", err)
140						}
141					};
142
143					if !projected.is_empty() {
144						result.push(Diff::insert(projected));
145					}
146				}
147				Diff::Update {
148					pre,
149					post,
150				} => {
151					let projected_post = self.project(&post)?;
152					let projected_pre = self.project(&pre)?;
153
154					if !projected_post.is_empty() {
155						result.push(Diff::update(projected_pre, projected_post));
156					}
157				}
158				Diff::Remove {
159					pre,
160				} => {
161					let projected_pre = self.project(&pre)?;
162					if !projected_pre.is_empty() {
163						result.push(Diff::remove(projected_pre));
164					}
165				}
166			}
167		}
168
169		Ok(Change::from_flow(self.node, change.version, result, change.changed_at))
170	}
171
172	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
173		self.parent.pull(txn, rows)
174	}
175}