reifydb_sub_flow/operator/
map.rs1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{
7 interface::{
8 catalog::flow::FlowNodeId,
9 change::{Change, Diff},
10 },
11 value::column::{ColumnWithName, columns::Columns},
12};
13use reifydb_engine::{
14 expression::{
15 compile::{CompiledExpr, compile_expression},
16 context::{CompileContext, EvalContext},
17 },
18 vm::stack::SymbolTable,
19};
20use reifydb_routine::routine::registry::Routines;
21use reifydb_rql::expression::{Expression, name::display_label};
22use reifydb_runtime::context::RuntimeContext;
23use reifydb_type::{
24 Result,
25 fragment::Fragment,
26 params::Params,
27 value::{identity::IdentityId, row_number::RowNumber},
28};
29
30use crate::{Operator, operator::Operators, transaction::FlowTransaction};
31
32static EMPTY_PARAMS: Params = Params::None;
34static EMPTY_SYMBOL_TABLE: LazyLock<SymbolTable> = LazyLock::new(SymbolTable::new);
35
36pub struct MapOperator {
37 parent: Arc<Operators>,
38 node: FlowNodeId,
39 expressions: Vec<Expression>,
40 compiled_expressions: Vec<CompiledExpr>,
41 routines: Routines,
42 runtime_context: RuntimeContext,
43}
44
45impl MapOperator {
46 pub fn new(
47 parent: Arc<Operators>,
48 node: FlowNodeId,
49 expressions: Vec<Expression>,
50 routines: Routines,
51 runtime_context: RuntimeContext,
52 ) -> Self {
53 let compile_ctx = CompileContext {
54 symbols: &EMPTY_SYMBOL_TABLE,
55 };
56 let compiled_expressions: Vec<CompiledExpr> = expressions
57 .iter()
58 .map(|e| compile_expression(&compile_ctx, e))
59 .collect::<Result<Vec<_>>>()
60 .expect("Failed to compile expressions");
61
62 Self {
63 parent,
64 node,
65 expressions,
66 compiled_expressions,
67 routines,
68 runtime_context,
69 }
70 }
71
72 fn project(&self, columns: &Columns) -> Result<Columns> {
74 let row_count = columns.row_count();
75 if row_count == 0 {
76 return Ok(Columns::empty());
77 }
78
79 let session = EvalContext {
80 params: &EMPTY_PARAMS,
81 symbols: &EMPTY_SYMBOL_TABLE,
82 routines: &self.routines,
83 runtime_context: &self.runtime_context,
84 arena: None,
85 identity: IdentityId::root(),
86 is_aggregate_context: false,
87 columns: Columns::empty(),
88 row_count: 1,
89 target: None,
90 take: None,
91 };
92 let exec_ctx = session.with_eval(columns.clone(), row_count);
93
94 let mut result_columns = Vec::with_capacity(self.expressions.len());
95
96 for (i, compiled_expr) in self.compiled_expressions.iter().enumerate() {
97 let evaluated_col = compiled_expr.execute(&exec_ctx)?;
98
99 let expr = &self.expressions[i];
100 let field_name = display_label(expr).text().to_string();
101
102 let named_column =
103 ColumnWithName::new(Fragment::internal(field_name), evaluated_col.data().clone());
104
105 result_columns.push(named_column);
106 }
107
108 let row_numbers = if columns.row_numbers.is_empty() {
109 Vec::new()
110 } else {
111 columns.row_numbers.iter().cloned().collect()
112 };
113
114 Ok(Columns::with_system_columns(
115 result_columns,
116 row_numbers,
117 columns.created_at.to_vec(),
118 columns.updated_at.to_vec(),
119 ))
120 }
121}
122
123impl Operator for MapOperator {
124 fn id(&self) -> FlowNodeId {
125 self.node
126 }
127
128 fn apply(&self, _txn: &mut FlowTransaction, change: Change) -> Result<Change> {
129 let mut result = Vec::new();
130
131 for diff in change.diffs.into_iter() {
132 match diff {
133 Diff::Insert {
134 post,
135 } => {
136 let projected = match self.project(&post) {
137 Ok(projected) => projected,
138 Err(err) => {
139 panic!("{:#?}", err)
140 }
141 };
142
143 if !projected.is_empty() {
144 result.push(Diff::insert(projected));
145 }
146 }
147 Diff::Update {
148 pre,
149 post,
150 } => {
151 let projected_post = self.project(&post)?;
152 let projected_pre = self.project(&pre)?;
153
154 if !projected_post.is_empty() {
155 result.push(Diff::update(projected_pre, projected_post));
156 }
157 }
158 Diff::Remove {
159 pre,
160 } => {
161 let projected_pre = self.project(&pre)?;
162 if !projected_pre.is_empty() {
163 result.push(Diff::remove(projected_pre));
164 }
165 }
166 }
167 }
168
169 Ok(Change::from_flow(self.node, change.version, result, change.changed_at))
170 }
171
172 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
173 self.parent.pull(txn, rows)
174 }
175}