1use std::sync::Arc;
5
6use FlowNodeType::{Aggregate, SinkView, SourceInlineData, SourceTable, SourceView};
7use reifydb_catalog::{CatalogTableQueryOperations, CatalogViewQueryOperations, resolve::resolve_view};
8use reifydb_core::{
9 Error,
10 interface::{FlowId, FlowNodeId, SourceId},
11};
12use reifydb_engine::StandardCommandTransaction;
13use reifydb_rql::flow::{
14 Flow, FlowNode, FlowNodeType,
15 FlowNodeType::{Apply, Distinct, Extend, Filter, Join, Map, Sort, Take, Union, Window},
16};
17use reifydb_type::internal;
18
19use super::eval::evaluate_operator_config;
20use crate::{
21 engine::FlowEngine,
22 operator::{
23 ApplyOperator, DistinctOperator, ExtendOperator, FilterOperator, JoinOperator, MapOperator, Operators,
24 SinkViewOperator, SortOperator, SourceTableOperator, SourceViewOperator, TakeOperator, WindowOperator,
25 },
26};
27
28impl FlowEngine {
29 pub fn register(&self, txn: &mut StandardCommandTransaction, flow: Flow) -> crate::Result<()> {
30 debug_assert!(!self.inner.flows.read().contains_key(&flow.id), "Flow already registered");
31
32 for node_id in flow.topological_order()? {
33 let node = flow.get_node(&node_id).unwrap();
34 self.add(txn, &flow, node)?;
35 }
36
37 self.inner.analyzer.write().add(flow.clone());
39 self.inner.flows.write().insert(flow.id, flow);
40
41 Ok(())
42 }
43
44 fn add(&self, txn: &mut StandardCommandTransaction, flow: &Flow, node: &FlowNode) -> crate::Result<()> {
45 debug_assert!(!self.inner.operators.read().contains_key(&node.id), "Operator already registered");
46 let node = node.clone();
47
48 match node.ty {
49 SourceInlineData {
50 ..
51 } => {
52 unimplemented!()
53 }
54 SourceTable {
55 table,
56 } => {
57 let table = txn.get_table(table)?;
58
59 self.add_source(flow.id, node.id, SourceId::table(table.id));
60 self.inner.operators.write().insert(
61 node.id,
62 Arc::new(Operators::SourceTable(SourceTableOperator::new(node.id, table))),
63 );
64 }
65 SourceView {
66 view,
67 } => {
68 let view = txn.get_view(view)?;
69 self.add_source(flow.id, node.id, SourceId::view(view.id));
70 self.inner.operators.write().insert(
71 node.id,
72 Arc::new(Operators::SourceView(SourceViewOperator::new(node.id, view))),
73 );
74 }
75 SinkView {
76 view,
77 } => {
78 let parent = self
79 .inner
80 .operators
81 .read()
82 .get(&node.inputs[0])
83 .ok_or_else(|| Error(internal!("Parent operator not found")))?
84 .clone();
85
86 self.add_sink(flow.id, node.id, SourceId::view(*view));
87 self.inner.operators.write().insert(
88 node.id,
89 Arc::new(Operators::SinkView(SinkViewOperator::new(
90 parent,
91 node.id,
92 resolve_view(txn, view)?,
93 ))),
94 );
95 }
96 Filter {
97 conditions,
98 } => {
99 let parent = self
100 .inner
101 .operators
102 .read()
103 .get(&node.inputs[0])
104 .ok_or_else(|| Error(internal!("Parent operator not found")))?
105 .clone();
106 self.inner.operators.write().insert(
107 node.id,
108 Arc::new(Operators::Filter(FilterOperator::new(parent, node.id, conditions))),
109 );
110 }
111 Map {
112 expressions,
113 } => {
114 let parent = self
115 .inner
116 .operators
117 .read()
118 .get(&node.inputs[0])
119 .ok_or_else(|| Error(internal!("Parent operator not found")))?
120 .clone();
121 self.inner.operators.write().insert(
122 node.id,
123 Arc::new(Operators::Map(MapOperator::new(parent, node.id, expressions))),
124 );
125 }
126 Extend {
127 expressions,
128 } => {
129 let parent = self
130 .inner
131 .operators
132 .read()
133 .get(&node.inputs[0])
134 .ok_or_else(|| Error(internal!("Parent operator not found")))?
135 .clone();
136 self.inner.operators.write().insert(
137 node.id,
138 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
139 );
140 }
141 Sort {
142 by: _,
143 } => {
144 let parent = self
145 .inner
146 .operators
147 .read()
148 .get(&node.inputs[0])
149 .ok_or_else(|| Error(internal!("Parent operator not found")))?
150 .clone();
151 self.inner.operators.write().insert(
152 node.id,
153 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
154 );
155 }
156 Take {
157 limit,
158 } => {
159 let parent = self
160 .inner
161 .operators
162 .read()
163 .get(&node.inputs[0])
164 .ok_or_else(|| Error(internal!("Parent operator not found")))?
165 .clone();
166 self.inner.operators.write().insert(
167 node.id,
168 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
169 );
170 }
171 Join {
172 join_type,
173 left,
174 right,
175 alias,
176 } => {
177 if node.inputs.len() != 2 {
180 return Err(Error(internal!("Join node must have exactly 2 inputs")));
181 }
182
183 let left_node = node.inputs[0];
184 let right_node = node.inputs[1];
185
186 let operators = self.inner.operators.read();
187 let left_parent = operators
188 .get(&left_node)
189 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
190 .clone();
191
192 let right_parent = operators
193 .get(&right_node)
194 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
195 .clone();
196 drop(operators);
197
198 self.inner.operators.write().insert(
199 node.id,
200 Arc::new(Operators::Join(JoinOperator::new(
201 left_parent,
202 right_parent,
203 node.id,
204 join_type,
205 left_node,
206 right_node,
207 left,
208 right,
209 alias,
210 self.inner.executor.clone(),
211 ))),
212 );
213 }
214 Distinct {
215 expressions,
216 } => {
217 let parent = self
218 .inner
219 .operators
220 .read()
221 .get(&node.inputs[0])
222 .ok_or_else(|| Error(internal!("Parent operator not found")))?
223 .clone();
224 self.inner.operators.write().insert(
225 node.id,
226 Arc::new(Operators::Distinct(DistinctOperator::new(
227 parent,
228 node.id,
229 expressions,
230 ))),
231 );
232 }
233 Union {} => unimplemented!(),
235 Apply {
236 operator_name,
237 expressions,
238 } => {
239 let parent = self
240 .inner
241 .operators
242 .read()
243 .get(&node.inputs[0])
244 .ok_or_else(|| Error(internal!("Parent operator not found")))?
245 .clone();
246
247 let operator = if self.is_ffi_operator(operator_name.as_str()) {
249 let config = evaluate_operator_config(
250 expressions.as_slice(),
251 &self.inner.evaluator,
252 )?;
253 self.create_ffi_operator(operator_name.as_str(), node.id, &config)?
254 } else {
255 self.inner.registry.create_operator(
257 operator_name.as_str(),
258 node.id,
259 expressions.as_slice(),
260 )?
261 };
262
263 self.inner.operators.write().insert(
264 node.id,
265 Arc::new(Operators::Apply(ApplyOperator::new(parent, node.id, operator))),
266 );
267 }
268 Aggregate {
269 ..
270 } => unimplemented!(),
271 Window {
272 window_type,
273 size,
274 slide,
275 group_by,
276 aggregations,
277 min_events,
278 max_window_count,
279 max_window_age,
280 } => {
281 let parent = self
282 .inner
283 .operators
284 .read()
285 .get(&node.inputs[0])
286 .ok_or_else(|| Error(internal!("Parent operator not found")))?
287 .clone();
288 let operator = WindowOperator::new(
289 parent,
290 node.id,
291 window_type.clone(),
292 size.clone(),
293 slide.clone(),
294 group_by.clone(),
295 aggregations.clone(),
296 min_events.clone(),
297 max_window_count.clone(),
298 max_window_age.clone(),
299 );
300 self.inner.operators.write().insert(node.id, Arc::new(Operators::Window(operator)));
301 }
302 }
303
304 Ok(())
305 }
306
307 fn add_source(&self, flow: FlowId, node: FlowNodeId, source: SourceId) {
308 let mut sources = self.inner.sources.write();
309 let nodes = sources.entry(source).or_insert_with(Vec::new);
310
311 let entry = (flow, node);
312 if !nodes.contains(&entry) {
313 nodes.push(entry);
314 }
315 }
316
317 fn add_sink(&self, flow: FlowId, node: FlowNodeId, sink: SourceId) {
318 let mut sinks = self.inner.sinks.write();
319 let nodes = sinks.entry(sink).or_insert_with(Vec::new);
320
321 let entry = (flow, node);
322 if !nodes.contains(&entry) {
323 nodes.push(entry);
324 }
325 }
326}