1use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8 interface::catalog::{
9 flow::{FlowId, FlowNodeId},
10 id::ViewId,
11 shape::ShapeId,
12 view::ViewKind,
13 },
14 internal,
15};
16use reifydb_rql::flow::{
17 flow::FlowDag,
18 node::{
19 FlowNode,
20 FlowNodeType::{
21 self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkRingBufferView,
22 SinkSeriesView, SinkSubscription, SinkTableView, Sort, SourceFlow, SourceInlineData,
23 SourceRingBuffer, SourceSeries, SourceTable, SourceView, Take, Window,
24 },
25 },
26};
27use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
28use reifydb_type::{Result, error::Error};
29use tracing::instrument;
30
31use super::eval::evaluate_operator_config;
32#[cfg(reifydb_target = "native")]
33use crate::operator::apply::ApplyOperator;
34use crate::{
35 engine::FlowEngine,
36 operator::{
37 Operators,
38 append::AppendOperator,
39 distinct::DistinctOperator,
40 extend::ExtendOperator,
41 filter::FilterOperator,
42 gate::GateOperator,
43 join::operator::{JoinOperator, JoinSideConfig},
44 map::MapOperator,
45 scan::{
46 flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
47 series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
48 },
49 sink::{
50 ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator,
51 view::SinkTableViewOperator,
52 },
53 sort::SortOperator,
54 take::TakeOperator,
55 window::{WindowConfig, WindowOperator},
56 },
57};
58
59impl FlowEngine {
60 #[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
61 pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
62 self.register_with_transaction(&mut Transaction::Command(txn), flow)
63 }
64
65 #[instrument(name = "flow::register_with_transaction", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
66 pub fn register_with_transaction(&mut self, txn: &mut Transaction<'_>, flow: FlowDag) -> Result<()> {
67 debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
68
69 for node_id in flow.topological_order()? {
70 let node = flow.get_node(&node_id).unwrap();
71 self.add(txn, &flow, node)?;
72 }
73
74 self.analyzer.add(flow.clone());
75 self.flows.insert(flow.id, flow);
76
77 Ok(())
78 }
79
80 #[instrument(name = "flow::add", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
81 pub fn add(&mut self, txn: &mut Transaction<'_>, flow: &FlowDag, node: &FlowNode) -> Result<()> {
82 debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
83 let node = node.clone();
84
85 match node.ty {
86 SourceInlineData {
87 ..
88 } => {
89 unimplemented!()
90 }
91 SourceTable {
92 table,
93 } => {
94 let table = self.catalog.get_table(&mut txn.reborrow(), table)?;
95
96 self.add_source(flow.id, node.id, ShapeId::table(table.id));
97 self.operators.insert(
98 node.id,
99 Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
100 );
101 }
102 SourceView {
103 view,
104 } => self.register_source_view(txn, flow, &node, view)?,
105 SourceFlow {
106 flow: source_flow,
107 } => {
108 let source_flow = self.catalog.get_flow(&mut txn.reborrow(), source_flow)?;
109 self.operators.insert(
110 node.id,
111 Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
112 node.id,
113 source_flow,
114 ))),
115 );
116 }
117 SourceRingBuffer {
118 ringbuffer,
119 } => {
120 let rb = self.catalog.get_ringbuffer(&mut txn.reborrow(), ringbuffer)?;
121 self.add_source(flow.id, node.id, ShapeId::ringbuffer(rb.id));
122 self.operators.insert(
123 node.id,
124 Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
125 node.id, rb,
126 ))),
127 );
128 }
129 SourceSeries {
130 series,
131 } => {
132 let s = self.catalog.get_series(&mut txn.reborrow(), series)?;
133 self.add_source(flow.id, node.id, ShapeId::series(s.id));
134 self.operators.insert(
135 node.id,
136 Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
137 );
138 }
139 SinkTableView {
140 view,
141 table,
142 } => {
143 let parent = self
144 .operators
145 .get(&node.inputs[0])
146 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
147 .clone();
148
149 self.add_sink(flow.id, node.id, ShapeId::view(*view));
150 let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
151 self.operators.insert(
152 node.id,
153 Arc::new(Operators::SinkTableView(SinkTableViewOperator::new(
154 parent, node.id, resolved, table,
155 ))),
156 );
157 }
158 SinkRingBufferView {
159 view,
160 ringbuffer,
161 capacity,
162 propagate_evictions,
163 } => {
164 let parent = self
165 .operators
166 .get(&node.inputs[0])
167 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
168 .clone();
169 self.add_sink(flow.id, node.id, ShapeId::view(*view));
170 let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
171 self.operators.insert(
172 node.id,
173 Arc::new(Operators::SinkRingBufferView(SinkRingBufferViewOperator::new(
174 parent,
175 node.id,
176 resolved,
177 ringbuffer,
178 capacity,
179 propagate_evictions,
180 ))),
181 );
182 }
183 SinkSeriesView {
184 view,
185 series,
186 key,
187 } => {
188 let parent = self
189 .operators
190 .get(&node.inputs[0])
191 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
192 .clone();
193 self.add_sink(flow.id, node.id, ShapeId::view(*view));
194 let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
195 self.operators.insert(
196 node.id,
197 Arc::new(Operators::SinkSeriesView(SinkSeriesViewOperator::new(
198 parent,
199 node.id,
200 resolved,
201 series,
202 key.clone(),
203 ))),
204 );
205 }
206 SinkSubscription {
207 ..
208 } => {
209 return Err(Error(Box::new(internal!(
212 "SinkSubscription nodes are no longer supported in persistent flows"
213 ))));
214 }
215 Filter {
216 conditions,
217 } => {
218 let parent = self
219 .operators
220 .get(&node.inputs[0])
221 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
222 .clone();
223 self.operators.insert(
224 node.id,
225 Arc::new(Operators::Filter(FilterOperator::new(
226 parent,
227 node.id,
228 conditions,
229 self.executor.routines.clone(),
230 self.runtime_context.clone(),
231 ))),
232 );
233 }
234 Gate {
235 conditions,
236 } => {
237 let parent = self
238 .operators
239 .get(&node.inputs[0])
240 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
241 .clone();
242 self.operators.insert(
243 node.id,
244 Arc::new(Operators::Gate(GateOperator::new(
245 parent,
246 node.id,
247 conditions,
248 self.executor.routines.clone(),
249 self.runtime_context.clone(),
250 ))),
251 );
252 }
253 Map {
254 expressions,
255 } => {
256 let parent = self
257 .operators
258 .get(&node.inputs[0])
259 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
260 .clone();
261 self.operators.insert(
262 node.id,
263 Arc::new(Operators::Map(MapOperator::new(
264 parent,
265 node.id,
266 expressions,
267 self.executor.routines.clone(),
268 self.runtime_context.clone(),
269 ))),
270 );
271 }
272 Extend {
273 expressions,
274 } => {
275 let parent = self
276 .operators
277 .get(&node.inputs[0])
278 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
279 .clone();
280 self.operators.insert(
281 node.id,
282 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
283 );
284 }
285 Sort {
286 by: _,
287 } => {
288 let parent = self
289 .operators
290 .get(&node.inputs[0])
291 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
292 .clone();
293 self.operators.insert(
294 node.id,
295 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
296 );
297 }
298 Take {
299 limit,
300 } => {
301 let parent = self
302 .operators
303 .get(&node.inputs[0])
304 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
305 .clone();
306 self.operators.insert(
307 node.id,
308 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
309 );
310 }
311 Join {
312 join_type,
313 left,
314 right,
315 alias,
316 } => {
317 if node.inputs.len() != 2 {
318 return Err(Error(Box::new(internal!("Join node must have exactly 2 inputs"))));
319 }
320
321 let left_node = node.inputs[0];
322 let right_node = node.inputs[1];
323
324 let left_parent = self
325 .operators
326 .get(&left_node)
327 .ok_or_else(|| Error(Box::new(internal!("Left parent operator not found"))))?
328 .clone();
329
330 let right_parent = self
331 .operators
332 .get(&right_node)
333 .ok_or_else(|| Error(Box::new(internal!("Right parent operator not found"))))?
334 .clone();
335
336 self.operators.insert(
337 node.id,
338 Arc::new(Operators::Join(JoinOperator::new(
339 JoinSideConfig {
340 parent: left_parent,
341 node: left_node,
342 exprs: left,
343 },
344 JoinSideConfig {
345 parent: right_parent,
346 node: right_node,
347 exprs: right,
348 },
349 node.id,
350 join_type,
351 alias,
352 self.executor.clone(),
353 ))),
354 );
355 }
356 Distinct {
357 expressions,
358 } => {
359 let parent = self
360 .operators
361 .get(&node.inputs[0])
362 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
363 .clone();
364 self.operators.insert(
365 node.id,
366 Arc::new(Operators::Distinct(DistinctOperator::new(
367 parent,
368 node.id,
369 expressions,
370 self.executor.routines.clone(),
371 self.runtime_context.clone(),
372 ))),
373 );
374 }
375 Append => {
376 if node.inputs.len() < 2 {
377 return Err(Error(Box::new(internal!(
378 "Append node must have at least 2 inputs"
379 ))));
380 }
381
382 let mut parents = Vec::with_capacity(node.inputs.len());
383
384 for input_node_id in &node.inputs {
385 let parent = self
386 .operators
387 .get(input_node_id)
388 .ok_or_else(|| {
389 Error(Box::new(internal!(
390 "Parent operator not found for input {:?}",
391 input_node_id
392 )))
393 })?
394 .clone();
395 parents.push(parent);
396 }
397
398 self.operators.insert(
399 node.id,
400 Arc::new(Operators::Append(AppendOperator::new(
401 node.id,
402 parents,
403 node.inputs.clone(),
404 ))),
405 );
406 }
407 Apply {
408 operator,
409 expressions,
410 } => {
411 let config = evaluate_operator_config(
412 expressions.as_slice(),
413 &self.executor.routines,
414 &self.runtime_context,
415 )?;
416
417 if let Some(factory) = self.custom_operators.get(operator.as_str()) {
418 let op = factory(node.id, &config)?;
419 self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
420 } else {
421 #[cfg(reifydb_target = "native")]
422 {
423 let parent = self
424 .operators
425 .get(&node.inputs[0])
426 .ok_or_else(|| {
427 Error(Box::new(internal!("Parent operator not found")))
428 })?
429 .clone();
430
431 if !self.is_ffi_operator(operator.as_str()) {
432 return Err(Error(Box::new(internal!(
433 "Unknown operator: {}",
434 operator
435 ))));
436 }
437
438 let ffi_op =
439 self.create_ffi_operator(operator.as_str(), node.id, &config)?;
440
441 self.operators.insert(
442 node.id,
443 Arc::new(Operators::Apply(ApplyOperator::new(
444 parent, node.id, ffi_op,
445 ))),
446 );
447 }
448 #[cfg(not(reifydb_target = "native"))]
449 {
450 let _ = operator;
451 return Err(Error(Box::new(internal!(
452 "FFI operators are not supported in WASM"
453 ))));
454 }
455 }
456 }
457 Aggregate {
458 ..
459 } => unimplemented!(),
460 Window {
461 kind,
462 group_by,
463 aggregations,
464 ts,
465 } => {
466 let parent = self
467 .operators
468 .get(&node.inputs[0])
469 .ok_or_else(|| Error(Box::new(internal!("Parent operator not found"))))?
470 .clone();
471 let operator = WindowOperator::new(WindowConfig {
472 parent,
473 node: node.id,
474 kind: kind.clone(),
475 group_by: group_by.clone(),
476 aggregations: aggregations.clone(),
477 ts: ts.clone(),
478 runtime_context: self.runtime_context.clone(),
479 routines: self.executor.routines.clone(),
480 });
481 self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
482 }
483 }
484
485 Ok(())
486 }
487
488 #[inline]
489 fn register_source_view(
490 &mut self,
491 txn: &mut Transaction<'_>,
492 flow: &FlowDag,
493 node: &FlowNode,
494 view: ViewId,
495 ) -> Result<()> {
496 let view = self.catalog.get_view(&mut txn.reborrow(), view)?;
497 self.add_source(flow.id, node.id, ShapeId::view(view.id()));
498
499 self.add_source(flow.id, node.id, view.underlying_id());
504
505 if view.kind() == ViewKind::Transactional {
518 let current_flow_is_transactional = flow.get_node_ids().any(|nid| {
519 if let Some(n) = flow.get_node(&nid) {
520 let sink_view = match &n.ty {
521 SinkTableView {
522 view,
523 ..
524 }
525 | SinkRingBufferView {
526 view,
527 ..
528 }
529 | SinkSeriesView {
530 view,
531 ..
532 } => Some(view),
533 _ => None,
534 };
535 sink_view
536 .and_then(|v| {
537 self.catalog.find_view(&mut txn.reborrow(), *v).ok().flatten()
538 })
539 .map(|v| v.kind() == ViewKind::Transactional)
540 .unwrap_or(false)
541 } else {
542 false
543 }
544 });
545
546 let current_flow_is_subscription = flow.get_node_ids().any(|nid| {
547 flow.get_node(&nid).map(|n| matches!(n.ty, SinkSubscription { .. })).unwrap_or(false)
548 });
549
550 if !current_flow_is_transactional && !current_flow_is_subscription {
551 let mut additional_sources = Vec::new();
552 if let Some(view_flow) = self.catalog.find_flow_by_name(
553 &mut txn.reborrow(),
554 view.namespace(),
555 view.name(),
556 )? {
557 let flow_nodes = self
558 .catalog
559 .list_flow_nodes_by_flow(&mut txn.reborrow(), view_flow.id)?;
560 for flow_node in &flow_nodes {
561 if let Ok(nt) = from_bytes::<FlowNodeType>(&flow_node.data)
562 && let Some(shape) = nt.primitive_source_shape_id()
563 {
564 additional_sources.push(shape);
565 }
566 }
567 }
568 for source in additional_sources {
569 self.add_source(flow.id, node.id, source);
570 }
571 }
572 }
573
574 self.operators
575 .insert(node.id, Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))));
576 Ok(())
577 }
578
579 pub fn add_source(&mut self, flow: FlowId, node: FlowNodeId, shape: ShapeId) {
580 let nodes = self.sources.entry(shape).or_default();
581
582 let entry = (flow, node);
583 if !nodes.contains(&entry) {
584 nodes.push(entry);
585 }
586 }
587
588 pub fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: ShapeId) {
589 let nodes = self.sinks.entry(sink).or_default();
590
591 let entry = (flow, node);
592 if !nodes.contains(&entry) {
593 nodes.push(entry);
594 }
595 }
596}