reifydb_sub_flow/operator/
map.rs1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7 interface::{
8 catalog::flow::FlowNodeId,
9 change::{Change, Diff},
10 },
11 value::column::{Column, columns::Columns},
12};
13use reifydb_engine::{
14 expression::{
15 compile::{CompiledExpr, compile_expression},
16 context::{CompileContext, EvalContext},
17 },
18 vm::stack::SymbolTable,
19};
20use reifydb_function::registry::Functions;
21use reifydb_rql::expression::Expression;
22use reifydb_runtime::clock::Clock;
23use reifydb_type::{fragment::Fragment, params::Params, value::row_number::RowNumber};
24
25use crate::{Operator, operator::Operators, transaction::FlowTransaction};
26
27static EMPTY_PARAMS: Params = Params::None;
29static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
30
31pub struct MapOperator {
32 parent: Arc<Operators>,
33 node: FlowNodeId,
34 expressions: Vec<Expression>,
35 compiled_expressions: Vec<CompiledExpr>,
36 functions: Functions,
37 clock: Clock,
38}
39
40impl MapOperator {
41 pub fn new(
42 parent: Arc<Operators>,
43 node: FlowNodeId,
44 expressions: Vec<Expression>,
45 functions: Functions,
46 clock: Clock,
47 ) -> Self {
48 let compile_ctx = CompileContext {
49 functions: &functions,
50 symbol_table: &EMPTY_SYMBOL_TABLE,
51 };
52 let compiled_expressions: Vec<CompiledExpr> = expressions
53 .iter()
54 .map(|e| compile_expression(&compile_ctx, e))
55 .collect::<Result<Vec<_>, _>>()
56 .expect("Failed to compile expressions");
57
58 Self {
59 parent,
60 node,
61 expressions,
62 compiled_expressions,
63 functions,
64 clock,
65 }
66 }
67
68 fn project(&self, columns: &Columns) -> reifydb_type::Result<Columns> {
70 let row_count = columns.row_count();
71 if row_count == 0 {
72 return Ok(Columns::empty());
73 }
74
75 let exec_ctx = EvalContext {
76 target: None,
77 columns: columns.clone(),
78 row_count,
79 take: None,
80 params: &EMPTY_PARAMS,
81 symbol_table: &EMPTY_SYMBOL_TABLE,
82 is_aggregate_context: false,
83 functions: &self.functions,
84 clock: &self.clock,
85 arena: None,
86 };
87
88 let mut result_columns = Vec::with_capacity(self.expressions.len());
89
90 for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
91 let evaluated_col = compiled_expr.execute(&exec_ctx)?;
92
93 let expr = &self.expressions[i];
94 let field_name = match expr {
95 Expression::Alias(alias_expr) => alias_expr.alias.name().to_string(),
96 Expression::Column(col_expr) => col_expr.0.name.text().to_string(),
97 Expression::AccessSource(access_expr) => access_expr.column.name.text().to_string(),
98 _ => expr.full_fragment_owned().text().to_string(),
99 };
100
101 let named_column = Column {
102 name: Fragment::internal(field_name),
103 data: evaluated_col.data().clone(),
104 };
105
106 result_columns.push(named_column);
107 }
108
109 let row_numbers = if columns.row_numbers.is_empty() {
110 Vec::new()
111 } else {
112 columns.row_numbers.iter().cloned().collect()
113 };
114
115 Ok(Columns::with_row_numbers(result_columns, row_numbers))
116 }
117}
118
119impl Operator for MapOperator {
120 fn id(&self) -> FlowNodeId {
121 self.node
122 }
123
124 fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> reifydb_type::Result<Change> {
125 let mut result = Vec::new();
126
127 for diff in change.diffs.into_iter() {
128 match diff {
129 Diff::Insert {
130 post,
131 } => {
132 let projected = match self.project(&post) {
133 Ok(projected) => projected,
134 Err(err) => {
135 panic!("{:#?}", err)
136 }
137 };
138
139 if !projected.is_empty() {
140 result.push(Diff::Insert {
141 post: projected,
142 });
143 }
144 }
145 Diff::Update {
146 pre,
147 post,
148 } => {
149 let projected_post = self.project(&post)?;
150 let projected_pre = self.project(&pre)?;
151
152 if !projected_post.is_empty() {
153 result.push(Diff::Update {
154 pre: projected_pre,
155 post: projected_post,
156 });
157 }
158 }
159 Diff::Remove {
160 pre,
161 } => {
162 let projected_pre = self.project(&pre)?;
163 if !projected_pre.is_empty() {
164 result.push(Diff::Remove {
165 pre: projected_pre,
166 });
167 }
168 }
169 }
170 }
171
172 Ok(Change::from_flow(self.node, change.version, result))
173 }
174
175 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> reifydb_type::Result<Columns> {
176 self.parent.pull(txn, rows)
177 }
178}