1use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8 interface::catalog::{
9 flow::{FlowId, FlowNodeId},
10 primitive::PrimitiveId,
11 view::ViewKind,
12 },
13 internal,
14};
15use reifydb_rql::flow::{
16 flow::FlowDag,
17 node::{
18 FlowNode,
19 FlowNodeType::{
20 self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkSubscription,
21 SinkView, Sort, SourceFlow, SourceInlineData, SourceRingBuffer, SourceSeries, SourceTable,
22 SourceView, Take, Window,
23 },
24 },
25};
26use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
27use reifydb_type::{Result, error::Error};
28use tracing::instrument;
29
30use super::eval::evaluate_operator_config;
31#[cfg(reifydb_target = "native")]
32use crate::operator::apply::ApplyOperator;
33use crate::{
34 engine::FlowEngine,
35 operator::{
36 Operators,
37 append::AppendOperator,
38 distinct::DistinctOperator,
39 extend::ExtendOperator,
40 filter::FilterOperator,
41 gate::GateOperator,
42 join::operator::JoinOperator,
43 map::MapOperator,
44 scan::{
45 flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
46 series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
47 },
48 sink::{subscription::SinkSubscriptionOperator, view::SinkViewOperator},
49 sort::SortOperator,
50 take::TakeOperator,
51 window::WindowOperator,
52 },
53};
54
55impl FlowEngine {
56 #[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
57 pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
58 self.register_with_transaction(&mut Transaction::Command(txn), flow)
59 }
60
61 #[instrument(name = "flow::register_with_transaction", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
62 pub fn register_with_transaction(&mut self, txn: &mut Transaction<'_>, flow: FlowDag) -> Result<()> {
63 debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
64
65 for node_id in flow.topological_order()? {
66 let node = flow.get_node(&node_id).unwrap();
67 self.add(txn, &flow, node)?;
68 }
69
70 self.analyzer.add(flow.clone());
71 self.flows.insert(flow.id, flow);
72
73 Ok(())
74 }
75
76 #[instrument(name = "flow::add", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
77 fn add(&mut self, txn: &mut Transaction<'_>, flow: &FlowDag, node: &FlowNode) -> Result<()> {
78 debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
79 let node = node.clone();
80
81 match node.ty {
82 SourceInlineData {
83 ..
84 } => {
85 unimplemented!()
86 }
87 SourceTable {
88 table,
89 } => {
90 let table = self.catalog.get_table(&mut txn.reborrow(), table)?;
91
92 self.add_source(flow.id, node.id, PrimitiveId::table(table.id));
93 self.operators.insert(
94 node.id,
95 Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
96 );
97 }
98 SourceView {
99 view,
100 } => {
101 let view = self.catalog.get_view(&mut txn.reborrow(), view)?;
102 self.add_source(flow.id, node.id, PrimitiveId::view(view.id));
103
104 if view.kind == ViewKind::Transactional {
110 let mut additional_sources = Vec::new();
111 if let Some(view_flow) = self.catalog.find_flow_by_name(
112 &mut txn.reborrow(),
113 view.namespace,
114 &view.name,
115 )? {
116 let flow_nodes = self
117 .catalog
118 .list_flow_nodes_by_flow(&mut txn.reborrow(), view_flow.id)?;
119 for flow_node in &flow_nodes {
120 if flow_node.node_type == 1
122 || flow_node.node_type == 17 || flow_node.node_type == 18
123 {
124 if let Ok(nt) =
125 from_bytes::<FlowNodeType>(&flow_node.data)
126 {
127 match nt {
128 SourceTable {
129 table: t,
130 } => {
131 additional_sources.push(
132 PrimitiveId::table(t),
133 );
134 }
135 SourceRingBuffer {
136 ringbuffer: rb,
137 } => {
138 additional_sources.push(
139 PrimitiveId::ringbuffer(
140 rb,
141 ),
142 );
143 }
144 SourceSeries {
145 series: s,
146 } => {
147 additional_sources.push(
148 PrimitiveId::series(s),
149 );
150 }
151 _ => {}
152 }
153 }
154 }
155 }
156 }
157 for source in additional_sources {
158 self.add_source(flow.id, node.id, source);
159 }
160 }
161
162 self.operators.insert(
163 node.id,
164 Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
165 );
166 }
167 SourceFlow {
168 flow: source_flow,
169 } => {
170 let source_flow_def = self.catalog.get_flow(&mut txn.reborrow(), source_flow)?;
171 self.operators.insert(
172 node.id,
173 Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
174 node.id,
175 source_flow_def,
176 ))),
177 );
178 }
179 SourceRingBuffer {
180 ringbuffer,
181 } => {
182 let rb = self.catalog.get_ringbuffer(&mut txn.reborrow(), ringbuffer)?;
183 self.add_source(flow.id, node.id, PrimitiveId::ringbuffer(rb.id));
184 self.operators.insert(
185 node.id,
186 Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
187 node.id, rb,
188 ))),
189 );
190 }
191 SourceSeries {
192 series,
193 } => {
194 let s = self.catalog.get_series(&mut txn.reborrow(), series)?;
195 self.add_source(flow.id, node.id, PrimitiveId::series(s.id));
196 self.operators.insert(
197 node.id,
198 Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
199 );
200 }
201 SinkView {
202 view,
203 } => {
204 let parent = self
205 .operators
206 .get(&node.inputs[0])
207 .ok_or_else(|| Error(internal!("Parent operator not found")))?
208 .clone();
209
210 self.add_sink(flow.id, node.id, PrimitiveId::view(*view));
211 let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
212 self.operators.insert(
213 node.id,
214 Arc::new(Operators::SinkView(SinkViewOperator::new(parent, node.id, resolved))),
215 );
216 }
217 SinkSubscription {
218 subscription,
219 } => {
220 if node.inputs.is_empty() {
222 return Err(Error(internal!(
223 "SinkSubscription node has no inputs - flow may have been deleted during loading"
224 )));
225 }
226 let parent = self
227 .operators
228 .get(&node.inputs[0])
229 .ok_or_else(|| Error(internal!("Parent operator not found")))?
230 .clone();
231
232 let resolved = self.catalog.resolve_subscription(&mut txn.reborrow(), subscription)?;
235 self.operators.insert(
236 node.id,
237 Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
238 parent, node.id, resolved,
239 ))),
240 );
241 }
242 Filter {
243 conditions,
244 } => {
245 let parent = self
246 .operators
247 .get(&node.inputs[0])
248 .ok_or_else(|| Error(internal!("Parent operator not found")))?
249 .clone();
250 self.operators.insert(
251 node.id,
252 Arc::new(Operators::Filter(FilterOperator::new(
253 parent,
254 node.id,
255 conditions,
256 self.executor.functions.clone(),
257 self.clock.clone(),
258 ))),
259 );
260 }
261 Gate {
262 conditions,
263 } => {
264 let parent = self
265 .operators
266 .get(&node.inputs[0])
267 .ok_or_else(|| Error(internal!("Parent operator not found")))?
268 .clone();
269 self.operators.insert(
270 node.id,
271 Arc::new(Operators::Gate(GateOperator::new(
272 parent,
273 node.id,
274 conditions,
275 self.executor.functions.clone(),
276 self.clock.clone(),
277 ))),
278 );
279 }
280 Map {
281 expressions,
282 } => {
283 let parent = self
284 .operators
285 .get(&node.inputs[0])
286 .ok_or_else(|| Error(internal!("Parent operator not found")))?
287 .clone();
288 self.operators.insert(
289 node.id,
290 Arc::new(Operators::Map(MapOperator::new(
291 parent,
292 node.id,
293 expressions,
294 self.executor.functions.clone(),
295 self.clock.clone(),
296 ))),
297 );
298 }
299 Extend {
300 expressions,
301 } => {
302 let parent = self
303 .operators
304 .get(&node.inputs[0])
305 .ok_or_else(|| Error(internal!("Parent operator not found")))?
306 .clone();
307 self.operators.insert(
308 node.id,
309 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
310 );
311 }
312 Sort {
313 by: _,
314 } => {
315 let parent = self
316 .operators
317 .get(&node.inputs[0])
318 .ok_or_else(|| Error(internal!("Parent operator not found")))?
319 .clone();
320 self.operators.insert(
321 node.id,
322 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
323 );
324 }
325 Take {
326 limit,
327 } => {
328 let parent = self
329 .operators
330 .get(&node.inputs[0])
331 .ok_or_else(|| Error(internal!("Parent operator not found")))?
332 .clone();
333 self.operators.insert(
334 node.id,
335 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
336 );
337 }
338 Join {
339 join_type,
340 left,
341 right,
342 alias,
343 } => {
344 if node.inputs.len() != 2 {
346 return Err(Error(internal!("Join node must have exactly 2 inputs")));
347 }
348
349 let left_node = node.inputs[0];
350 let right_node = node.inputs[1];
351
352 let left_parent = self
353 .operators
354 .get(&left_node)
355 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
356 .clone();
357
358 let right_parent = self
359 .operators
360 .get(&right_node)
361 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
362 .clone();
363
364 self.operators.insert(
365 node.id,
366 Arc::new(Operators::Join(JoinOperator::new(
367 left_parent,
368 right_parent,
369 node.id,
370 join_type,
371 left_node,
372 right_node,
373 left,
374 right,
375 alias,
376 self.executor.clone(),
377 ))),
378 );
379 }
380 Distinct {
381 expressions,
382 } => {
383 let parent = self
384 .operators
385 .get(&node.inputs[0])
386 .ok_or_else(|| Error(internal!("Parent operator not found")))?
387 .clone();
388 self.operators.insert(
389 node.id,
390 Arc::new(Operators::Distinct(DistinctOperator::new(
391 parent,
392 node.id,
393 expressions,
394 self.executor.functions.clone(),
395 self.clock.clone(),
396 ))),
397 );
398 }
399 Append {} => {
400 if node.inputs.len() < 2 {
402 return Err(Error(internal!("Append node must have at least 2 inputs")));
403 }
404
405 let mut parents = Vec::with_capacity(node.inputs.len());
406
407 for input_node_id in &node.inputs {
408 let parent = self
409 .operators
410 .get(input_node_id)
411 .ok_or_else(|| {
412 Error(internal!(
413 "Parent operator not found for input {:?}",
414 input_node_id
415 ))
416 })?
417 .clone();
418 parents.push(parent);
419 }
420
421 self.operators.insert(
422 node.id,
423 Arc::new(Operators::Append(AppendOperator::new(
424 node.id,
425 parents,
426 node.inputs.clone(),
427 ))),
428 );
429 }
430 Apply {
431 operator,
432 expressions,
433 } => {
434 let config = evaluate_operator_config(
435 expressions.as_slice(),
436 &self.executor.functions,
437 &self.clock,
438 )?;
439
440 if let Some(factory) = self.custom_operators.get(operator.as_str()) {
441 let op = factory(node.id, &config)?;
442 self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
443 } else {
444 #[cfg(reifydb_target = "native")]
445 {
446 let parent = self
447 .operators
448 .get(&node.inputs[0])
449 .ok_or_else(|| Error(internal!("Parent operator not found")))?
450 .clone();
451
452 if !self.is_ffi_operator(operator.as_str()) {
453 return Err(Error(internal!("Unknown operator: {}", operator)));
454 }
455
456 let ffi_op =
457 self.create_ffi_operator(operator.as_str(), node.id, &config)?;
458
459 self.operators.insert(
460 node.id,
461 Arc::new(Operators::Apply(ApplyOperator::new(
462 parent, node.id, ffi_op,
463 ))),
464 );
465 }
466 #[cfg(not(reifydb_target = "native"))]
467 {
468 let _ = operator;
469 return Err(Error(internal!(
470 "FFI operators are not supported in WASM"
471 )));
472 }
473 }
474 }
475 Aggregate {
476 ..
477 } => unimplemented!(),
478 Window {
479 window_type,
480 size,
481 slide,
482 group_by,
483 aggregations,
484 min_events,
485 max_window_count,
486 max_window_age,
487 } => {
488 let parent = self
489 .operators
490 .get(&node.inputs[0])
491 .ok_or_else(|| Error(internal!("Parent operator not found")))?
492 .clone();
493 let operator = WindowOperator::new(
494 parent,
495 node.id,
496 window_type.clone(),
497 size.clone(),
498 slide.clone(),
499 group_by.clone(),
500 aggregations.clone(),
501 min_events.clone(),
502 max_window_count.clone(),
503 max_window_age.clone(),
504 self.clock.clone(),
505 self.executor.functions.clone(),
506 );
507 self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
508 }
509 }
510
511 Ok(())
512 }
513
514 fn add_source(&mut self, flow: FlowId, node: FlowNodeId, source: PrimitiveId) {
515 let nodes = self.sources.entry(source).or_insert_with(Vec::new);
516
517 let entry = (flow, node);
518 if !nodes.contains(&entry) {
519 nodes.push(entry);
520 }
521 }
522
523 fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: PrimitiveId) {
524 let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
525
526 let entry = (flow, node);
527 if !nodes.contains(&entry) {
528 nodes.push(entry);
529 }
530 }
531}