reifydb_sub_flow/operator/
map.rs1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7 interface::{
8 catalog::flow::FlowNodeId,
9 change::{Change, Diff},
10 },
11 value::column::{Column, columns::Columns},
12};
13use reifydb_engine::{
14 expression::{
15 compile::{CompiledExpr, compile_expression},
16 context::{CompileContext, EvalSession},
17 },
18 vm::stack::SymbolTable,
19};
20use reifydb_routine::function::registry::Functions;
21use reifydb_rql::expression::Expression;
22use reifydb_runtime::context::RuntimeContext;
23use reifydb_type::{
24 Result,
25 fragment::Fragment,
26 params::Params,
27 value::{identity::IdentityId, row_number::RowNumber},
28};
29
30use crate::{Operator, operator::Operators, transaction::FlowTransaction};
31
32static EMPTY_PARAMS: Params = Params::None;
34static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
35
36pub struct MapOperator {
37 parent: Arc<Operators>,
38 node: FlowNodeId,
39 expressions: Vec<Expression>,
40 compiled_expressions: Vec<CompiledExpr>,
41 functions: Functions,
42 runtime_context: RuntimeContext,
43}
44
45impl MapOperator {
46 pub fn new(
47 parent: Arc<Operators>,
48 node: FlowNodeId,
49 expressions: Vec<Expression>,
50 functions: Functions,
51 runtime_context: RuntimeContext,
52 ) -> Self {
53 let compile_ctx = CompileContext {
54 functions: &functions,
55 symbols: &EMPTY_SYMBOL_TABLE,
56 };
57 let compiled_expressions: Vec<CompiledExpr> = expressions
58 .iter()
59 .map(|e| compile_expression(&compile_ctx, e))
60 .collect::<Result<Vec<_>>>()
61 .expect("Failed to compile expressions");
62
63 Self {
64 parent,
65 node,
66 expressions,
67 compiled_expressions,
68 functions,
69 runtime_context,
70 }
71 }
72
73 fn project(&self, columns: &Columns) -> Result<Columns> {
75 let row_count = columns.row_count();
76 if row_count == 0 {
77 return Ok(Columns::empty());
78 }
79
80 let session = EvalSession {
81 params: &EMPTY_PARAMS,
82 symbols: &EMPTY_SYMBOL_TABLE,
83 functions: &self.functions,
84 runtime_context: &self.runtime_context,
85 arena: None,
86 identity: IdentityId::root(),
87 is_aggregate_context: false,
88 };
89 let exec_ctx = session.eval(columns.clone(), row_count);
90
91 let mut result_columns = Vec::with_capacity(self.expressions.len());
92
93 for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
94 let evaluated_col = compiled_expr.execute(&exec_ctx)?;
95
96 let expr = &self.expressions[i];
97 let field_name = match expr {
98 Expression::Alias(alias_expr) => alias_expr.alias.name().to_string(),
99 Expression::Column(col_expr) => col_expr.0.name.text().to_string(),
100 Expression::AccessSource(access_expr) => access_expr.column.name.text().to_string(),
101 _ => expr.full_fragment_owned().text().to_string(),
102 };
103
104 let named_column = Column {
105 name: Fragment::internal(field_name),
106 data: evaluated_col.data().clone(),
107 };
108
109 result_columns.push(named_column);
110 }
111
112 let row_numbers = if columns.row_numbers.is_empty() {
113 Vec::new()
114 } else {
115 columns.row_numbers.iter().cloned().collect()
116 };
117
118 Ok(Columns::with_row_numbers(result_columns, row_numbers))
119 }
120}
121
122impl Operator for MapOperator {
123 fn id(&self) -> FlowNodeId {
124 self.node
125 }
126
127 fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
128 let mut result = Vec::new();
129
130 for diff in change.diffs.into_iter() {
131 match diff {
132 Diff::Insert {
133 post,
134 } => {
135 let projected = match self.project(&post) {
136 Ok(projected) => projected,
137 Err(err) => {
138 panic!("{:#?}", err)
139 }
140 };
141
142 if !projected.is_empty() {
143 result.push(Diff::Insert {
144 post: projected,
145 });
146 }
147 }
148 Diff::Update {
149 pre,
150 post,
151 } => {
152 let projected_post = self.project(&post)?;
153 let projected_pre = self.project(&pre)?;
154
155 if !projected_post.is_empty() {
156 result.push(Diff::Update {
157 pre: projected_pre,
158 post: projected_post,
159 });
160 }
161 }
162 Diff::Remove {
163 pre,
164 } => {
165 let projected_pre = self.project(&pre)?;
166 if !projected_pre.is_empty() {
167 result.push(Diff::Remove {
168 pre: projected_pre,
169 });
170 }
171 }
172 }
173 }
174
175 Ok(Change::from_flow(self.node, change.version, result))
176 }
177
178 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
179 self.parent.pull(txn, rows)
180 }
181}