reifydb_sub_flow/operator/
map.rs1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7 interface::{
8 catalog::flow::FlowNodeId,
9 change::{Change, Diff},
10 },
11 value::column::{Column, columns::Columns},
12};
13use reifydb_engine::{
14 expression::{
15 compile::{CompiledExpr, compile_expression},
16 context::{CompileContext, EvalContext},
17 },
18 vm::stack::SymbolTable,
19};
20use reifydb_function::registry::Functions;
21use reifydb_rql::expression::Expression;
22use reifydb_runtime::clock::Clock;
23use reifydb_type::{
24 Result,
25 fragment::Fragment,
26 params::Params,
27 value::{identity::IdentityId, row_number::RowNumber},
28};
29
30use crate::{Operator, operator::Operators, transaction::FlowTransaction};
31
32static EMPTY_PARAMS: Params = Params::None;
34static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(|| SymbolTable::new());
35
36pub struct MapOperator {
37 parent: Arc<Operators>,
38 node: FlowNodeId,
39 expressions: Vec<Expression>,
40 compiled_expressions: Vec<CompiledExpr>,
41 functions: Functions,
42 clock: Clock,
43}
44
45impl MapOperator {
46 pub fn new(
47 parent: Arc<Operators>,
48 node: FlowNodeId,
49 expressions: Vec<Expression>,
50 functions: Functions,
51 clock: Clock,
52 ) -> Self {
53 let compile_ctx = CompileContext {
54 functions: &functions,
55 symbol_table: &EMPTY_SYMBOL_TABLE,
56 };
57 let compiled_expressions: Vec<CompiledExpr> = expressions
58 .iter()
59 .map(|e| compile_expression(&compile_ctx, e))
60 .collect::<Result<Vec<_>>>()
61 .expect("Failed to compile expressions");
62
63 Self {
64 parent,
65 node,
66 expressions,
67 compiled_expressions,
68 functions,
69 clock,
70 }
71 }
72
73 fn project(&self, columns: &Columns) -> Result<Columns> {
75 let row_count = columns.row_count();
76 if row_count == 0 {
77 return Ok(Columns::empty());
78 }
79
80 let exec_ctx = EvalContext {
81 target: None,
82 columns: columns.clone(),
83 row_count,
84 take: None,
85 params: &EMPTY_PARAMS,
86 symbol_table: &EMPTY_SYMBOL_TABLE,
87 is_aggregate_context: false,
88 functions: &self.functions,
89 clock: &self.clock,
90 arena: None,
91 identity: IdentityId::root(),
92 };
93
94 let mut result_columns = Vec::with_capacity(self.expressions.len());
95
96 for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
97 let evaluated_col = compiled_expr.execute(&exec_ctx)?;
98
99 let expr = &self.expressions[i];
100 let field_name = match expr {
101 Expression::Alias(alias_expr) => alias_expr.alias.name().to_string(),
102 Expression::Column(col_expr) => col_expr.0.name.text().to_string(),
103 Expression::AccessSource(access_expr) => access_expr.column.name.text().to_string(),
104 _ => expr.full_fragment_owned().text().to_string(),
105 };
106
107 let named_column = Column {
108 name: Fragment::internal(field_name),
109 data: evaluated_col.data().clone(),
110 };
111
112 result_columns.push(named_column);
113 }
114
115 let row_numbers = if columns.row_numbers.is_empty() {
116 Vec::new()
117 } else {
118 columns.row_numbers.iter().cloned().collect()
119 };
120
121 Ok(Columns::with_row_numbers(result_columns, row_numbers))
122 }
123}
124
125impl Operator for MapOperator {
126 fn id(&self) -> FlowNodeId {
127 self.node
128 }
129
130 fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
131 let mut result = Vec::new();
132
133 for diff in change.diffs.into_iter() {
134 match diff {
135 Diff::Insert {
136 post,
137 } => {
138 let projected = match self.project(&post) {
139 Ok(projected) => projected,
140 Err(err) => {
141 panic!("{:#?}", err)
142 }
143 };
144
145 if !projected.is_empty() {
146 result.push(Diff::Insert {
147 post: projected,
148 });
149 }
150 }
151 Diff::Update {
152 pre,
153 post,
154 } => {
155 let projected_post = self.project(&post)?;
156 let projected_pre = self.project(&pre)?;
157
158 if !projected_post.is_empty() {
159 result.push(Diff::Update {
160 pre: projected_pre,
161 post: projected_post,
162 });
163 }
164 }
165 Diff::Remove {
166 pre,
167 } => {
168 let projected_pre = self.project(&pre)?;
169 if !projected_pre.is_empty() {
170 result.push(Diff::Remove {
171 pre: projected_pre,
172 });
173 }
174 }
175 }
176 }
177
178 Ok(Change::from_flow(self.node, change.version, result))
179 }
180
181 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
182 self.parent.pull(txn, rows)
183 }
184}