1use std::{mem, sync::Arc};
5
6use postcard::from_bytes;
7use reifydb_core::{
8 interface::catalog::{
9 flow::{FlowId, FlowNodeId},
10 schema::SchemaId,
11 view::ViewKind,
12 },
13 internal,
14};
15use reifydb_rql::flow::{
16 flow::FlowDag,
17 node::{
18 FlowNode,
19 FlowNodeType::{
20 self, Aggregate, Append, Apply, Distinct, Extend, Filter, Gate, Join, Map, SinkRingBufferView,
21 SinkSeriesView, SinkSubscription, SinkTableView, Sort, SourceFlow, SourceInlineData,
22 SourceRingBuffer, SourceSeries, SourceTable, SourceView, Take, Window,
23 },
24 },
25};
26use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
27use reifydb_type::{Result, error::Error};
28use tracing::instrument;
29
30use super::eval::evaluate_operator_config;
31#[cfg(reifydb_target = "native")]
32use crate::operator::apply::ApplyOperator;
33use crate::{
34 engine::FlowEngine,
35 operator::{
36 Operators,
37 append::AppendOperator,
38 distinct::DistinctOperator,
39 extend::ExtendOperator,
40 filter::FilterOperator,
41 gate::GateOperator,
42 join::operator::JoinOperator,
43 map::MapOperator,
44 scan::{
45 flow::PrimitiveFlowOperator, ringbuffer::PrimitiveRingBufferOperator,
46 series::PrimitiveSeriesOperator, table::PrimitiveTableOperator, view::PrimitiveViewOperator,
47 },
48 sink::{
49 ringbuffer_view::SinkRingBufferViewOperator, series_view::SinkSeriesViewOperator,
50 subscription::SinkSubscriptionOperator, view::SinkTableViewOperator,
51 },
52 sort::SortOperator,
53 take::TakeOperator,
54 window::WindowOperator,
55 },
56};
57
58impl FlowEngine {
59 #[instrument(name = "flow::register", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
60 pub fn register(&mut self, txn: &mut CommandTransaction, flow: FlowDag) -> Result<()> {
61 self.register_with_transaction(&mut Transaction::Command(txn), flow)
62 }
63
64 #[instrument(name = "flow::register_with_transaction", level = "debug", skip(self, txn), fields(flow_id = ?flow.id))]
65 pub fn register_with_transaction(&mut self, txn: &mut Transaction<'_>, flow: FlowDag) -> Result<()> {
66 debug_assert!(!self.flows.contains_key(&flow.id), "Flow already registered");
67
68 for node_id in flow.topological_order()? {
69 let node = flow.get_node(&node_id).unwrap();
70 self.add(txn, &flow, node)?;
71 }
72
73 self.analyzer.add(flow.clone());
74 self.flows.insert(flow.id, flow);
75
76 Ok(())
77 }
78
79 #[instrument(name = "flow::add", level = "debug", skip(self, txn, flow), fields(flow_id = ?flow.id, node_id = ?node.id, node_type = ?mem::discriminant(&node.ty)))]
80 fn add(&mut self, txn: &mut Transaction<'_>, flow: &FlowDag, node: &FlowNode) -> Result<()> {
81 debug_assert!(!self.operators.contains_key(&node.id), "Operator already registered");
82 let node = node.clone();
83
84 match node.ty {
85 SourceInlineData {
86 ..
87 } => {
88 unimplemented!()
89 }
90 SourceTable {
91 table,
92 } => {
93 let table = self.catalog.get_table(&mut txn.reborrow(), table)?;
94
95 self.add_source(flow.id, node.id, SchemaId::table(table.id));
96 self.operators.insert(
97 node.id,
98 Arc::new(Operators::SourceTable(PrimitiveTableOperator::new(node.id, table))),
99 );
100 }
101 SourceView {
102 view,
103 } => {
104 let view = self.catalog.get_view(&mut txn.reborrow(), view)?;
105 self.add_source(flow.id, node.id, SchemaId::view(view.id()));
106
107 if view.kind() == ViewKind::Deferred {
111 self.add_source(flow.id, node.id, view.underlying_id());
112 }
113
114 if view.kind() == ViewKind::Transactional {
120 let mut additional_sources = Vec::new();
121 if let Some(view_flow) = self.catalog.find_flow_by_name(
122 &mut txn.reborrow(),
123 view.namespace(),
124 view.name(),
125 )? {
126 let flow_nodes = self
127 .catalog
128 .list_flow_nodes_by_flow(&mut txn.reborrow(), view_flow.id)?;
129 for flow_node in &flow_nodes {
130 if flow_node.node_type == 1
132 || flow_node.node_type == 17 || flow_node.node_type == 18
133 {
134 if let Ok(nt) =
135 from_bytes::<FlowNodeType>(&flow_node.data)
136 {
137 match nt {
138 SourceTable {
139 table: t,
140 } => {
141 additional_sources.push(
142 SchemaId::table(t),
143 );
144 }
145 SourceRingBuffer {
146 ringbuffer: rb,
147 } => {
148 additional_sources.push(
149 SchemaId::ringbuffer(
150 rb,
151 ),
152 );
153 }
154 SourceSeries {
155 series: s,
156 } => {
157 additional_sources.push(
158 SchemaId::series(s),
159 );
160 }
161 _ => {}
162 }
163 }
164 }
165 }
166 }
167 for source in additional_sources {
168 self.add_source(flow.id, node.id, source);
169 }
170 }
171
172 self.operators.insert(
173 node.id,
174 Arc::new(Operators::SourceView(PrimitiveViewOperator::new(node.id, view))),
175 );
176 }
177 SourceFlow {
178 flow: source_flow,
179 } => {
180 let source_flow = self.catalog.get_flow(&mut txn.reborrow(), source_flow)?;
181 self.operators.insert(
182 node.id,
183 Arc::new(Operators::SourceFlow(PrimitiveFlowOperator::new(
184 node.id,
185 source_flow,
186 ))),
187 );
188 }
189 SourceRingBuffer {
190 ringbuffer,
191 } => {
192 let rb = self.catalog.get_ringbuffer(&mut txn.reborrow(), ringbuffer)?;
193 self.add_source(flow.id, node.id, SchemaId::ringbuffer(rb.id));
194 self.operators.insert(
195 node.id,
196 Arc::new(Operators::SourceRingBuffer(PrimitiveRingBufferOperator::new(
197 node.id, rb,
198 ))),
199 );
200 }
201 SourceSeries {
202 series,
203 } => {
204 let s = self.catalog.get_series(&mut txn.reborrow(), series)?;
205 self.add_source(flow.id, node.id, SchemaId::series(s.id));
206 self.operators.insert(
207 node.id,
208 Arc::new(Operators::SourceSeries(PrimitiveSeriesOperator::new(node.id, s))),
209 );
210 }
211 SinkTableView {
212 view,
213 table,
214 } => {
215 let parent = self
216 .operators
217 .get(&node.inputs[0])
218 .ok_or_else(|| Error(internal!("Parent operator not found")))?
219 .clone();
220
221 self.add_sink(flow.id, node.id, SchemaId::view(*view));
222 let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
223 self.operators.insert(
224 node.id,
225 Arc::new(Operators::SinkTableView(SinkTableViewOperator::new(
226 parent, node.id, resolved, table,
227 ))),
228 );
229 }
230 SinkRingBufferView {
231 view,
232 ringbuffer,
233 capacity,
234 propagate_evictions,
235 } => {
236 let parent = self
237 .operators
238 .get(&node.inputs[0])
239 .ok_or_else(|| Error(internal!("Parent operator not found")))?
240 .clone();
241 self.add_sink(flow.id, node.id, SchemaId::view(*view));
242 let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
243 self.operators.insert(
244 node.id,
245 Arc::new(Operators::SinkRingBufferView(SinkRingBufferViewOperator::new(
246 parent,
247 node.id,
248 resolved,
249 ringbuffer,
250 capacity,
251 propagate_evictions,
252 ))),
253 );
254 }
255 SinkSeriesView {
256 view,
257 series,
258 key,
259 } => {
260 let parent = self
261 .operators
262 .get(&node.inputs[0])
263 .ok_or_else(|| Error(internal!("Parent operator not found")))?
264 .clone();
265 self.add_sink(flow.id, node.id, SchemaId::view(*view));
266 let resolved = self.catalog.resolve_view(&mut txn.reborrow(), view)?;
267 self.operators.insert(
268 node.id,
269 Arc::new(Operators::SinkSeriesView(SinkSeriesViewOperator::new(
270 parent,
271 node.id,
272 resolved,
273 series,
274 key.clone(),
275 ))),
276 );
277 }
278 SinkSubscription {
279 subscription,
280 } => {
281 if node.inputs.is_empty() {
283 return Err(Error(internal!(
284 "SinkSubscription node has no inputs - flow may have been deleted during loading"
285 )));
286 }
287 let parent = self
288 .operators
289 .get(&node.inputs[0])
290 .ok_or_else(|| Error(internal!("Parent operator not found")))?
291 .clone();
292
293 let resolved = self.catalog.resolve_subscription(&mut txn.reborrow(), subscription)?;
296 self.operators.insert(
297 node.id,
298 Arc::new(Operators::SinkSubscription(SinkSubscriptionOperator::new(
299 parent, node.id, resolved,
300 ))),
301 );
302 }
303 Filter {
304 conditions,
305 } => {
306 let parent = self
307 .operators
308 .get(&node.inputs[0])
309 .ok_or_else(|| Error(internal!("Parent operator not found")))?
310 .clone();
311 self.operators.insert(
312 node.id,
313 Arc::new(Operators::Filter(FilterOperator::new(
314 parent,
315 node.id,
316 conditions,
317 self.executor.functions.clone(),
318 self.runtime_context.clone(),
319 ))),
320 );
321 }
322 Gate {
323 conditions,
324 } => {
325 let parent = self
326 .operators
327 .get(&node.inputs[0])
328 .ok_or_else(|| Error(internal!("Parent operator not found")))?
329 .clone();
330 self.operators.insert(
331 node.id,
332 Arc::new(Operators::Gate(GateOperator::new(
333 parent,
334 node.id,
335 conditions,
336 self.executor.functions.clone(),
337 self.runtime_context.clone(),
338 ))),
339 );
340 }
341 Map {
342 expressions,
343 } => {
344 let parent = self
345 .operators
346 .get(&node.inputs[0])
347 .ok_or_else(|| Error(internal!("Parent operator not found")))?
348 .clone();
349 self.operators.insert(
350 node.id,
351 Arc::new(Operators::Map(MapOperator::new(
352 parent,
353 node.id,
354 expressions,
355 self.executor.functions.clone(),
356 self.runtime_context.clone(),
357 ))),
358 );
359 }
360 Extend {
361 expressions,
362 } => {
363 let parent = self
364 .operators
365 .get(&node.inputs[0])
366 .ok_or_else(|| Error(internal!("Parent operator not found")))?
367 .clone();
368 self.operators.insert(
369 node.id,
370 Arc::new(Operators::Extend(ExtendOperator::new(parent, node.id, expressions))),
371 );
372 }
373 Sort {
374 by: _,
375 } => {
376 let parent = self
377 .operators
378 .get(&node.inputs[0])
379 .ok_or_else(|| Error(internal!("Parent operator not found")))?
380 .clone();
381 self.operators.insert(
382 node.id,
383 Arc::new(Operators::Sort(SortOperator::new(parent, node.id, Vec::new()))),
384 );
385 }
386 Take {
387 limit,
388 } => {
389 let parent = self
390 .operators
391 .get(&node.inputs[0])
392 .ok_or_else(|| Error(internal!("Parent operator not found")))?
393 .clone();
394 self.operators.insert(
395 node.id,
396 Arc::new(Operators::Take(TakeOperator::new(parent, node.id, limit))),
397 );
398 }
399 Join {
400 join_type,
401 left,
402 right,
403 alias,
404 } => {
405 if node.inputs.len() != 2 {
407 return Err(Error(internal!("Join node must have exactly 2 inputs")));
408 }
409
410 let left_node = node.inputs[0];
411 let right_node = node.inputs[1];
412
413 let left_parent = self
414 .operators
415 .get(&left_node)
416 .ok_or_else(|| Error(internal!("Left parent operator not found")))?
417 .clone();
418
419 let right_parent = self
420 .operators
421 .get(&right_node)
422 .ok_or_else(|| Error(internal!("Right parent operator not found")))?
423 .clone();
424
425 self.operators.insert(
426 node.id,
427 Arc::new(Operators::Join(JoinOperator::new(
428 left_parent,
429 right_parent,
430 node.id,
431 join_type,
432 left_node,
433 right_node,
434 left,
435 right,
436 alias,
437 self.executor.clone(),
438 ))),
439 );
440 }
441 Distinct {
442 expressions,
443 } => {
444 let parent = self
445 .operators
446 .get(&node.inputs[0])
447 .ok_or_else(|| Error(internal!("Parent operator not found")))?
448 .clone();
449 self.operators.insert(
450 node.id,
451 Arc::new(Operators::Distinct(DistinctOperator::new(
452 parent,
453 node.id,
454 expressions,
455 self.executor.functions.clone(),
456 self.runtime_context.clone(),
457 ))),
458 );
459 }
460 Append {} => {
461 if node.inputs.len() < 2 {
463 return Err(Error(internal!("Append node must have at least 2 inputs")));
464 }
465
466 let mut parents = Vec::with_capacity(node.inputs.len());
467
468 for input_node_id in &node.inputs {
469 let parent = self
470 .operators
471 .get(input_node_id)
472 .ok_or_else(|| {
473 Error(internal!(
474 "Parent operator not found for input {:?}",
475 input_node_id
476 ))
477 })?
478 .clone();
479 parents.push(parent);
480 }
481
482 self.operators.insert(
483 node.id,
484 Arc::new(Operators::Append(AppendOperator::new(
485 node.id,
486 parents,
487 node.inputs.clone(),
488 ))),
489 );
490 }
491 Apply {
492 operator,
493 expressions,
494 } => {
495 let config = evaluate_operator_config(
496 expressions.as_slice(),
497 &self.executor.functions,
498 &self.runtime_context,
499 )?;
500
501 if let Some(factory) = self.custom_operators.get(operator.as_str()) {
502 let op = factory(node.id, &config)?;
503 self.operators.insert(node.id, Arc::new(Operators::Custom(op)));
504 } else {
505 #[cfg(reifydb_target = "native")]
506 {
507 let parent = self
508 .operators
509 .get(&node.inputs[0])
510 .ok_or_else(|| Error(internal!("Parent operator not found")))?
511 .clone();
512
513 if !self.is_ffi_operator(operator.as_str()) {
514 return Err(Error(internal!("Unknown operator: {}", operator)));
515 }
516
517 let ffi_op =
518 self.create_ffi_operator(operator.as_str(), node.id, &config)?;
519
520 self.operators.insert(
521 node.id,
522 Arc::new(Operators::Apply(ApplyOperator::new(
523 parent, node.id, ffi_op,
524 ))),
525 );
526 }
527 #[cfg(not(reifydb_target = "native"))]
528 {
529 let _ = operator;
530 return Err(Error(internal!(
531 "FFI operators are not supported in WASM"
532 )));
533 }
534 }
535 }
536 Aggregate {
537 ..
538 } => unimplemented!(),
539 Window {
540 kind,
541 group_by,
542 aggregations,
543 ts,
544 } => {
545 let parent = self
546 .operators
547 .get(&node.inputs[0])
548 .ok_or_else(|| Error(internal!("Parent operator not found")))?
549 .clone();
550 let operator = WindowOperator::new(
551 parent,
552 node.id,
553 kind.clone(),
554 group_by.clone(),
555 aggregations.clone(),
556 ts.clone(),
557 self.runtime_context.clone(),
558 self.executor.functions.clone(),
559 );
560 self.operators.insert(node.id, Arc::new(Operators::Window(operator)));
561 }
562 }
563
564 Ok(())
565 }
566
567 fn add_source(&mut self, flow: FlowId, node: FlowNodeId, schema: SchemaId) {
568 let nodes = self.sources.entry(schema).or_insert_with(Vec::new);
569
570 let entry = (flow, node);
571 if !nodes.contains(&entry) {
572 nodes.push(entry);
573 }
574 }
575
576 fn add_sink(&mut self, flow: FlowId, node: FlowNodeId, sink: SchemaId) {
577 let nodes = self.sinks.entry(sink).or_insert_with(Vec::new);
578
579 let entry = (flow, node);
580 if !nodes.contains(&entry) {
581 nodes.push(entry);
582 }
583 }
584}