1use std::sync::Arc;
5
6use reifydb_core::{
7 interface::catalog::{
8 flow::{FlowId, FlowNodeId},
9 primitive::PrimitiveId,
10 },
11 internal,
12};
13use reifydb_rql::flow::{
14 flow::FlowDag,
15 node::{
16 FlowNode,
17 FlowNodeType::{
18 Aggregate, Append, Apply, Distinct, Extend, Filter, Join, Map, SinkSubscription, SinkView,
19 Sort, SourceFlow, SourceInlineData, SourceTable, SourceView, Take, Window,
20 },
21 },
22};
23use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
24use reifydb_type::error::Error;
25use tracing::instrument;
26
27use super::eval::evaluate_operator_config;
28use crate::{
29 engine::FlowEngine,
30 operator::{
31 Operators,
32 append::AppendOperator,
33 apply::ApplyOperator,
34 distinct::DistinctOperator,
35 extend::ExtendOperator,
36 filter::FilterOperator,
37 join::operator::JoinOperator,
38 map::MapOperator,
39 scan::{flow::PrimitiveFlowOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator},
40 sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator},
41 sort::SortOperator,
42 take::TakeOperator,
43 window::WindowOperator,
44 },
45};
46
47impl FlowEngine {
48 #[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
49 pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> reifydb_type::Result<()> {
50 debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
51
52 for node_id in flow.topological_order()? {
53 let node = flow.get_node(&node_id).unwrap();
54 self.add(txn, &flow, node)?;
55 }
56
57 self.analyzer.add(flow.clone());
58 self.flows.insert(flow.id, flow);
59
60 Ok(())
61 }
62
63 #[instrument(name = "flow::register::add_node", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?std::mem::discriminant(&node.ty)))]
64 fn add(&mut self, txn: &mut CommandTransaction, flow: &FlowDag, node: &FlowNode) -> reifydb_type::Result<()> {
65 debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
66 let node = node.clone();
67
68 match node.ty {
69 SourceInlineData {
70 ..
71 } => {
72 unimplemented!()
73 }
74 SourceTable {
75 table,
76 } => {
77 let table = self.catalog.get_table(&mut Transaction::Command(&mut *txn), table)?;
78
79 self.add_source(flow.id, node.id, PrimitiveId::table(table.id));
80 self.operators.insert(
81 node.id,
82 Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
83 );
84 }
85 SourceView {
86 view,
87 } => {
88 let view = self.catalog.get_view(&mut Transaction::Command(&mut *txn), view)?;
89 self.add_source(flow.id, node.id, PrimitiveId::view(view.id));
90 self.operators.insert(
91 node.id,
92 Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
93 );
94 }
95 SourceFlow {
96 flow: source_flow,
97 } => {
98 let source_flow_def =
99 self.catalog.get_flow(&mut Transaction::Command(&mut *txn), source_flow)?;
100 self.add_source(flow.id, node.id, PrimitiveId::flow(source_flow_def.id));
101 self.operators.insert(
102 node.id,
103 Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
104 node.id,
105 source_flow_def,
106 ))),
107 );
108 }
109 SinkView {
110 view,
111 } => {
112 let parent = self
113 .operators
114 .get(&node.inputs[0])
115 .ok_or_else(|| Error(internal!("Parent operator not found")))?
116 .clone();
117
118 self.add_sink(flow.id, node.id, PrimitiveId::view(*view));
119 let resolved = self.catalog.resolve_view(&mut Transaction::Command(&mut *txn), view)?;
120 self.operators.insert(
121 node.id,
122 Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
123 );
124 }
125 SinkSubscription {
126 subscription,
127 } => {
128 if node.inputs.is_empty() {
130 return Err(Error(internal!(
131 "SinkSubscription node has no inputs - flow may have been deleted during loading"
132 )));
133 }
134 let parent = self
135 .operators
136 .get(&node.inputs[0])
137 .ok_or_else(|| Error(internal!("Parent operator not found")))?
138 .clone();
139
140 let resolved = self
143 .catalog
144 .resolve_subscription(&mut Transaction::Command(&mut *txn), subscription)?;
145 self.operators.insert(
146 node.id,
147 Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
148 parent, node.id, resolved,
149 ))),
150 );
151 }
152 Filter {
153 conditions,
154 } => {
155 let parent = self
156 .operators
157 .get(&node.inputs[0])
158 .ok_or_else(|| Error(internal!("Parent operator not found")))?
159 .clone();
160 self.operators.insert(
161 node.id,
162 Arc::new(Operators::Filter(FilterOperator::new(
163 parent,
164 node.id,
165 conditions,
166 self.executor.functions.clone(),
167 self.clock.clone(),
168 ))),
169 );
170 }
171 Map {
172 expressions,
173 } => {
174 let parent = self
175 .operators
176 .get(&node.inputs[0])
177 .ok_or_else(|| Error(internal!("Parent operator not found")))?
178 .clone();
179 self.operators.insert(
180 node.id,
181 Arc::new(Operators::Map(MapOperator::new(
182 parent,
183 node.id,
184 expressions,
185 self.executor.functions.clone(),
186 self.clock.clone(),
187 ))),
188 );
189 }
190 Extend {
191 expressions,
192 } => {
193 let parent = self
194 .operators
195 .get(&node.inputs[0])
196 .ok_or_else(|| Error(internal!("Parent operator not found")))?
197 .clone();
198 self.operators.insert(
199 node.id,
200 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
201 );
202 }
203 Sort {
204 by: _,
205 } => {
206 let parent = self
207 .operators
208 .get(&node.inputs[0])
209 .ok_or_else(|| Error(internal!("Parent operator not found")))?
210 .clone();
211 self.operators.insert(
212 node.id,
213 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
214 );
215 }
216 Take {
217 limit,
218 } => {
219 let parent = self
220 .operators
221 .get(&node.inputs[0])
222 .ok_or_else(|| Error(internal!("Parent operator not found")))?
223 .clone();
224 self.operators.insert(
225 node.id,
226 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
227 );
228 }
229 Join {
230 join_type,
231 left,
232 right,
233 alias,
234 } => {
235 if node.inputs.len() != 2 {
237 return Err(Error(internal!("Join node must have exactly 2 inputs")));
238 }
239
240 let left_node = node.inputs[0];
241 let right_node = node.inputs[1];
242
243 let left_parent = self
244 .operators
245 .get(&left_node)
246 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
247 .clone();
248
249 let right_parent = self
250 .operators
251 .get(&right_node)
252 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
253 .clone();
254
255 self.operators.insert(
256 node.id,
257 Arc::new(Operators::Join(JoinOperator::new(
258 left_parent,
259 right_parent,
260 node.id,
261 join_type,
262 left_node,
263 right_node,
264 left,
265 right,
266 alias,
267 self.executor.clone(),
268 ))),
269 );
270 }
271 Distinct {
272 expressions,
273 } => {
274 let parent = self
275 .operators
276 .get(&node.inputs[0])
277 .ok_or_else(|| Error(internal!("Parent operator not found")))?
278 .clone();
279 self.operators.insert(
280 node.id,
281 Arc::new(Operators::Distinct(DistinctOperator::new(
282 parent,
283 node.id,
284 expressions,
285 self.executor.functions.clone(),
286 self.clock.clone(),
287 ))),
288 );
289 }
290 Append {} => {
291 if node.inputs.len() < 2 {
293 return Err(Error(internal!("Append node must have at least 2 inputs")));
294 }
295
296 let mut parents = Vec::with_capacity(node.inputs.len());
297
298 for input_node_id in &node.inputs {
299 let parent = self
300 .operators
301 .get(input_node_id)
302 .ok_or_else(|| {
303 Error(internal!(
304 "Parent operator not found for input {:?}",
305 input_node_id
306 ))
307 })?
308 .clone();
309 parents.push(parent);
310 }
311
312 self.operators.insert(
313 node.id,
314 Arc::new(Operators::Append(AppendOperator::new(
315 node.id,
316 parents,
317 node.inputs.clone(),
318 ))),
319 );
320 }
321 Apply {
322 operator,
323 expressions,
324 } => {
325 #[cfg(reifydb_target = "native")]
326 {
327 let parent = self
328 .operators
329 .get(&node.inputs[0])
330 .ok_or_else(|| Error(internal!("Parent operator not found")))?
331 .clone();
332
333 if !self.is_ffi_operator(operator.as_str()) {
334 unimplemented!("only ffi operators can be used")
335 }
336
337 let config = evaluate_operator_config(
338 expressions.as_slice(),
339 &self.executor.functions,
340 &self.clock,
341 )?;
342 let operator = self.create_ffi_operator(operator.as_str(), node.id, &config)?;
343
344 self.operators.insert(
345 node.id,
346 Arc::new(Operators::Apply(ApplyOperator::new(
347 parent, node.id, operator,
348 ))),
349 );
350 }
351 #[cfg(not(reifydb_target = "native"))]
352 {
353 let _ = (operator, expressions);
354 return Err(Error(internal!("FFI operators are not supported in WASM")));
355 }
356 }
357 Aggregate {
358 ..
359 } => unimplemented!(),
360 Window {
361 window_type,
362 size,
363 slide,
364 group_by,
365 aggregations,
366 min_events,
367 max_window_count,
368 max_window_age,
369 } => {
370 let parent = self
371 .operators
372 .get(&node.inputs[0])
373 .ok_or_else(|| Error(internal!("Parent operator not found")))?
374 .clone();
375 let operator = WindowOperator::new(
376 parent,
377 node.id,
378 window_type.clone(),
379 size.clone(),
380 slide.clone(),
381 group_by.clone(),
382 aggregations.clone(),
383 min_events.clone(),
384 max_window_count.clone(),
385 max_window_age.clone(),
386 self.clock.clone(),
387 self.executor.functions.clone(),
388 );
389 self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
390 }
391 }
392
393 Ok(())
394 }
395
396 fn add_source(&mut self, flow: FlowId, node: FlowNodeId, source: PrimitiveId) {
397 let nodes = self.sources.entry(source).or_insert_with(Vec::new);
398
399 let entry = (flow, node);
400 if !nodes.contains(&entry) {
401 nodes.push(entry);
402 }
403 }
404
405 fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: PrimitiveId) {
406 let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
407
408 let entry = (flow, node);
409 if !nodes.contains(&entry) {
410 nodes.push(entry);
411 }
412 }
413}